1use std::{
2 backtrace::Backtrace,
3 borrow::Cow,
4 error::Error as StdError,
5 fmt::{Display, Error as FmtError, Formatter},
6 io::{Error as IoError, ErrorKind},
7 string::ToString,
8};
9
10use actix_web::body::BoxBody;
11use actix_web::http::StatusCode;
12use actix_web::{HttpResponse, HttpResponseBuilder, ResponseError};
13use anyhow::Error as AnyError;
14use dbsp::{
15 Error as DbspError,
16 circuit::{LayoutError, circuit_builder::BootstrapInfo},
17 storage::backend::StorageError,
18};
19use feldera_types::{
20 error::{DetailedError, ErrorResponse},
21 runtime_status::RuntimeDesiredStatus,
22 suspend::SuspendError,
23};
24use serde::{Serialize, Serializer, ser::SerializeStruct};
25
26use super::journal::StepError;
27use crate::{DbspDetailedError, format::ParseError, transport::Step};
28
29#[derive(Debug, Serialize)]
31#[serde(untagged)]
32pub enum ConfigError {
33 PipelineConfigParseError {
35 error: String,
36 },
37
38 ParserConfigParseError {
40 endpoint_name: String,
41 error: String,
42 config: String,
43 },
44
45 EncoderConfigParseError {
47 endpoint_name: String,
48 error: String,
49 config: String,
50 },
51
52 DuplicateInputEndpoint {
54 endpoint_name: String,
55 },
56
57 DuplicateInputStream {
59 stream_name: String,
60 },
61
62 DuplicateOutputEndpoint {
64 endpoint_name: String,
65 },
66
67 DuplicateOutputStream {
69 stream_name: String,
70 },
71
72 UnknownInputFormat {
74 endpoint_name: String,
75 format_name: String,
76 },
77
78 UnknownOutputFormat {
80 endpoint_name: String,
81 format_name: String,
82 },
83
84 UnknownInputTransport {
86 endpoint_name: String,
87 transport_name: String,
88 },
89
90 UnknownOutputTransport {
92 endpoint_name: String,
93 transport_name: String,
94 },
95
96 UnknownInputStream {
99 endpoint_name: String,
100 stream_name: String,
101 },
102
103 UnknownOutputStream {
106 endpoint_name: String,
107 stream_name: String,
108 },
109
110 UnknownIndex {
113 endpoint_name: String,
114 index_name: String,
115 },
116
117 NotAnIndex {
118 endpoint_name: String,
119 index_name: String,
120 },
121
122 InputFormatNotSupported {
123 endpoint_name: String,
124 error: String,
125 },
126
127 OutputFormatNotSupported {
128 endpoint_name: String,
129 error: String,
130 },
131
132 InputFormatNotSpecified {
133 endpoint_name: String,
134 },
135
136 OutputFormatNotSpecified {
137 endpoint_name: String,
138 },
139
140 InvalidEncoderConfig {
141 endpoint_name: String,
142 error: String,
143 },
144
145 InvalidParserConfig {
146 endpoint_name: String,
147 error: String,
148 },
149
150 InvalidTransportConfig {
151 endpoint_name: String,
152 error: String,
153 },
154
155 InvalidOutputBufferConfig {
156 endpoint_name: String,
157 error: String,
158 },
159
160 CyclicDependency {
161 cycle: Vec<(String, String)>,
162 },
163
164 EmptyStartAfter {
165 endpoint_name: String,
166 },
167
168 FtRequiresStorage,
169 FtRequiresFtInput,
170
171 InvalidLayout(LayoutError),
172}
173
174impl StdError for ConfigError {}
175
176impl DbspDetailedError for ConfigError {
177 fn error_code(&self) -> Cow<'static, str> {
178 match self {
179 Self::PipelineConfigParseError { .. } => Cow::from("PipelineConfigParseError"),
180 Self::ParserConfigParseError { .. } => Cow::from("ParserConfigParseError"),
181 Self::EncoderConfigParseError { .. } => Cow::from("EncoderConfigParseError"),
182 Self::DuplicateInputEndpoint { .. } => Cow::from("DuplicateInputEndpoint"),
183 Self::DuplicateInputStream { .. } => Cow::from("DuplicateInputStream"),
184 Self::DuplicateOutputEndpoint { .. } => Cow::from("DuplicateOutputEndpoint"),
185 Self::DuplicateOutputStream { .. } => Cow::from("DuplicateOutputStream"),
186 Self::UnknownInputFormat { .. } => Cow::from("UnknownInputFormat"),
187 Self::UnknownOutputFormat { .. } => Cow::from("UnknownOutputFormat"),
188 Self::UnknownInputTransport { .. } => Cow::from("UnknownInputTransport"),
189 Self::UnknownOutputTransport { .. } => Cow::from("UnknownOutputTransport"),
190 Self::UnknownInputStream { .. } => Cow::from("UnknownInputStream"),
191 Self::UnknownOutputStream { .. } => Cow::from("UnknownOutputStream"),
192 Self::UnknownIndex { .. } => Cow::from("UnknownIndex"),
193 Self::NotAnIndex { .. } => Cow::from("NotAnIndex"),
194 Self::InputFormatNotSupported { .. } => Cow::from("InputFormatNotSupported"),
195 Self::OutputFormatNotSupported { .. } => Cow::from("OutputFormatNotSupported"),
196 Self::InputFormatNotSpecified { .. } => Cow::from("InputFormatNotSpecified"),
197 Self::OutputFormatNotSpecified { .. } => Cow::from("OutputFormatNotSpecified"),
198 Self::InvalidEncoderConfig { .. } => Cow::from("InvalidEncoderConfig"),
199 Self::InvalidParserConfig { .. } => Cow::from("InvalidParserConfig"),
200 Self::InvalidTransportConfig { .. } => Cow::from("InvalidTransportConfig"),
201 Self::InvalidOutputBufferConfig { .. } => Cow::from("InvalidOutputBufferConfig"),
202 Self::FtRequiresStorage => Cow::from("FtRequiresStorage"),
203 Self::FtRequiresFtInput => Cow::from("FtWithNonFtInput"),
204 Self::CyclicDependency { .. } => Cow::from("CyclicDependency"),
205 Self::EmptyStartAfter { .. } => Cow::from("EmptyStartAfter"),
206 Self::InvalidLayout(_) => Cow::from("LayoutError"),
207 }
208 }
209}
210
211impl Display for ConfigError {
212 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
213 match self {
214 Self::PipelineConfigParseError { error } => {
215 write!(f, "Failed to parse pipeline configuration: {error}")
216 }
217 Self::ParserConfigParseError {
218 endpoint_name,
219 error,
220 config,
221 } => {
222 write!(
223 f,
224 "Error parsing format configuration for input endpoint '{endpoint_name}': {error}\nInvalid configuration: {config}"
225 )
226 }
227 Self::EncoderConfigParseError {
228 endpoint_name,
229 error,
230 config,
231 } => {
232 write!(
233 f,
234 "Error parsing format configuration for output endpoint '{endpoint_name}': {error}\nInvalid configuration: {config}"
235 )
236 }
237 Self::DuplicateInputEndpoint { endpoint_name } => {
238 write!(f, "Input endpoint '{endpoint_name}' already exists")
239 }
240 Self::DuplicateInputStream { stream_name } => {
241 write!(f, "Duplicate table name '{stream_name}'")
242 }
243 Self::UnknownInputFormat {
244 endpoint_name,
245 format_name,
246 } => {
247 write!(
248 f,
249 "Input endpoint '{endpoint_name}' specifies unknown input format '{format_name}'"
250 )
251 }
252 Self::UnknownInputTransport {
253 endpoint_name,
254 transport_name,
255 } => {
256 write!(
257 f,
258 "Input endpoint '{endpoint_name}' specifies unknown input transport '{transport_name}'"
259 )
260 }
261 Self::DuplicateOutputEndpoint { endpoint_name } => {
262 write!(f, "Output endpoint '{endpoint_name}' already exists")
263 }
264 Self::DuplicateOutputStream { stream_name } => {
265 write!(f, "Duplicate table or view name '{stream_name}'")
266 }
267 Self::UnknownOutputFormat {
268 endpoint_name,
269 format_name,
270 } => {
271 write!(
272 f,
273 "Output endpoint '{endpoint_name}' specifies unknown output format '{format_name}'"
274 )
275 }
276 Self::UnknownOutputTransport {
277 endpoint_name,
278 transport_name,
279 } => {
280 write!(
281 f,
282 "Output endpoint '{endpoint_name}' specifies unknown output transport '{transport_name}'"
283 )
284 }
285 Self::UnknownInputStream {
286 endpoint_name,
287 stream_name,
288 } => {
289 write!(
290 f,
291 "Input endpoint '{endpoint_name}' specifies unknown table '{stream_name}'"
292 )
293 }
294 Self::UnknownOutputStream {
295 endpoint_name,
296 stream_name,
297 } => {
298 write!(
299 f,
300 "Output endpoint '{endpoint_name}' specifies unknown output table or view '{stream_name}'"
301 )
302 }
303 Self::UnknownIndex {
304 endpoint_name,
305 index_name,
306 } => {
307 write!(
308 f,
309 "Output endpoint '{endpoint_name}' specifies index name '{index_name}'; however, the '{index_name}' relation is not an index"
310 )
311 }
312
313 Self::NotAnIndex {
314 endpoint_name,
315 index_name,
316 } => {
317 write!(
318 f,
319 "Output endpoint '{endpoint_name}' specifies unknown index '{index_name}'"
320 )
321 }
322
323 Self::InputFormatNotSupported {
324 endpoint_name,
325 error,
326 } => {
327 write!(
328 f,
329 "Format not supported on input endpoint '{endpoint_name}': {error}"
330 )
331 }
332 Self::OutputFormatNotSupported {
333 endpoint_name,
334 error,
335 } => {
336 write!(
337 f,
338 "Format not supported on output endpoint '{endpoint_name}': {error}"
339 )
340 }
341 Self::InputFormatNotSpecified { endpoint_name } => {
342 write!(
343 f,
344 "Data format is not specified for input endpoint '{endpoint_name}' (set the 'format' field inside connector configuration)"
345 )
346 }
347 Self::OutputFormatNotSpecified { endpoint_name } => {
348 write!(
349 f,
350 "Data format is not specified for output endpoint '{endpoint_name}' (set the 'format' field inside connector configuration)"
351 )
352 }
353 Self::InvalidEncoderConfig {
354 endpoint_name,
355 error,
356 } => {
357 write!(
358 f,
359 "invalid format configuration for output endpoint '{endpoint_name}': {error}"
360 )
361 }
362 Self::InvalidParserConfig {
363 endpoint_name,
364 error,
365 } => {
366 write!(
367 f,
368 "invalid format configuration for input endpoint '{endpoint_name}': {error}"
369 )
370 }
371 Self::InvalidTransportConfig {
372 endpoint_name,
373 error,
374 } => {
375 write!(
376 f,
377 "invalid transport configuration for endpoint '{endpoint_name}': {error}"
378 )
379 }
380 Self::InvalidOutputBufferConfig {
381 endpoint_name,
382 error,
383 } => {
384 write!(
385 f,
386 "invalid output buffer configuration for endpoint '{endpoint_name}': {error}"
387 )
388 }
389 Self::CyclicDependency { cycle } => {
390 let mut cycle = cycle.clone();
391 cycle.push(cycle[0].clone());
392 let tail = cycle[1..]
393 .iter()
394 .map(|(endpoint, label)| {
395 format!("waits for endpoint '{endpoint}' with label '{label}'")
396 })
397 .collect::<Vec<_>>()
398 .join(", which ");
399 write!(
400 f,
401 "cyclic 'start_after' dependency detected: endpoint '{}' with label '{}' {}",
402 cycle[0].0, cycle[0].1, tail
403 )
404 }
405 Self::EmptyStartAfter { endpoint_name } => {
406 write!(
407 f,
408 "empty 'start_after' field for input endpoint '{}'",
409 endpoint_name
410 )
411 }
412 Self::FtRequiresStorage => write!(
413 f,
414 "Fault tolerance is configured, which requires storage, but storage is not enabled"
415 ),
416 Self::FtRequiresFtInput => write!(
417 f,
418 "Fault tolerance is configured, but it cannot be enabled because the pipeline has at least one non-fault-tolerant input adapter"
419 ),
420 Self::InvalidLayout(e) => write!(f, "Multihost layout error: {e}"),
421 }
422 }
423}
424
425impl ConfigError {
426 pub fn pipeline_config_parse_error<E>(error: &E) -> Self
427 where
428 E: ToString,
429 {
430 Self::PipelineConfigParseError {
431 error: error.to_string(),
432 }
433 }
434
435 pub fn parser_config_parse_error<E>(endpoint_name: &str, error: &E, config: &str) -> Self
436 where
437 E: ToString,
438 {
439 Self::ParserConfigParseError {
440 endpoint_name: endpoint_name.to_owned(),
441 error: error.to_string(),
442 config: config.to_string(),
443 }
444 }
445
446 pub fn encoder_config_parse_error<E>(endpoint_name: &str, error: &E, config: &str) -> Self
447 where
448 E: ToString,
449 {
450 Self::EncoderConfigParseError {
451 endpoint_name: endpoint_name.to_owned(),
452 error: error.to_string(),
453 config: config.to_string(),
454 }
455 }
456
457 pub fn duplicate_input_endpoint(endpoint_name: &str) -> Self {
458 Self::DuplicateInputEndpoint {
459 endpoint_name: endpoint_name.to_owned(),
460 }
461 }
462
463 pub fn duplicate_input_stream(stream_name: &str) -> Self {
464 Self::DuplicateInputStream {
465 stream_name: stream_name.to_owned(),
466 }
467 }
468
469 pub fn unknown_input_format(endpoint_name: &str, format_name: &str) -> Self {
470 Self::UnknownInputFormat {
471 endpoint_name: endpoint_name.to_owned(),
472 format_name: format_name.to_owned(),
473 }
474 }
475
476 pub fn unknown_input_transport(endpoint_name: &str, transport_name: &str) -> Self {
477 Self::UnknownInputTransport {
478 endpoint_name: endpoint_name.to_owned(),
479 transport_name: transport_name.to_owned(),
480 }
481 }
482
483 pub fn duplicate_output_endpoint(endpoint_name: &str) -> Self {
484 Self::DuplicateOutputEndpoint {
485 endpoint_name: endpoint_name.to_owned(),
486 }
487 }
488
489 pub fn duplicate_output_stream(stream_name: &str) -> Self {
490 Self::DuplicateOutputStream {
491 stream_name: stream_name.to_owned(),
492 }
493 }
494
495 pub fn unknown_output_format(endpoint_name: &str, format_name: &str) -> Self {
496 Self::UnknownOutputFormat {
497 endpoint_name: endpoint_name.to_owned(),
498 format_name: format_name.to_owned(),
499 }
500 }
501
502 pub fn unknown_output_transport(endpoint_name: &str, transport_name: &str) -> Self {
503 Self::UnknownOutputTransport {
504 endpoint_name: endpoint_name.to_owned(),
505 transport_name: transport_name.to_owned(),
506 }
507 }
508
509 pub fn unknown_input_stream(endpoint_name: &str, stream_name: &str) -> Self {
510 Self::UnknownInputStream {
511 endpoint_name: endpoint_name.to_owned(),
512 stream_name: stream_name.to_owned(),
513 }
514 }
515
516 pub fn unknown_output_stream(endpoint_name: &str, stream_name: &str) -> Self {
517 Self::UnknownOutputStream {
518 endpoint_name: endpoint_name.to_owned(),
519 stream_name: stream_name.to_owned(),
520 }
521 }
522
523 pub fn unknown_index(endpoint_name: &str, index_name: &str) -> Self {
524 Self::UnknownIndex {
525 endpoint_name: endpoint_name.to_owned(),
526 index_name: index_name.to_owned(),
527 }
528 }
529
530 pub fn not_an_index(endpoint_name: &str, index_name: &str) -> Self {
531 Self::NotAnIndex {
532 endpoint_name: endpoint_name.to_owned(),
533 index_name: index_name.to_owned(),
534 }
535 }
536
537 pub fn input_format_not_supported(endpoint_name: &str, error: &str) -> Self {
538 Self::InputFormatNotSupported {
539 endpoint_name: endpoint_name.to_owned(),
540 error: error.to_owned(),
541 }
542 }
543
544 pub fn output_format_not_supported(endpoint_name: &str, error: &str) -> Self {
545 Self::OutputFormatNotSupported {
546 endpoint_name: endpoint_name.to_owned(),
547 error: error.to_owned(),
548 }
549 }
550
551 pub fn input_format_not_specified(endpoint_name: &str) -> Self {
552 Self::InputFormatNotSpecified {
553 endpoint_name: endpoint_name.to_owned(),
554 }
555 }
556
557 pub fn output_format_not_specified(endpoint_name: &str) -> Self {
558 Self::OutputFormatNotSpecified {
559 endpoint_name: endpoint_name.to_owned(),
560 }
561 }
562
563 pub fn invalid_encoder_configuration(endpoint_name: &str, error: &str) -> Self {
564 Self::InvalidEncoderConfig {
565 endpoint_name: endpoint_name.to_string(),
566 error: error.to_string(),
567 }
568 }
569
570 pub fn invalid_parser_configuration(endpoint_name: &str, error: &str) -> Self {
571 Self::InvalidParserConfig {
572 endpoint_name: endpoint_name.to_string(),
573 error: error.to_string(),
574 }
575 }
576
577 pub fn invalid_transport_configuration(endpoint_name: &str, error: &str) -> Self {
578 Self::InvalidTransportConfig {
579 endpoint_name: endpoint_name.to_string(),
580 error: error.to_string(),
581 }
582 }
583
584 pub fn invalid_output_buffer_configuration(endpoint_name: &str, error: &str) -> Self {
585 Self::InvalidOutputBufferConfig {
586 endpoint_name: endpoint_name.to_string(),
587 error: error.to_string(),
588 }
589 }
590
591 pub fn cyclic_dependency(cycle: Vec<(String, String)>) -> Self {
592 Self::CyclicDependency { cycle }
593 }
594
595 pub fn empty_start_after(endpoint_name: &str) -> Self {
596 Self::EmptyStartAfter {
597 endpoint_name: endpoint_name.to_string(),
598 }
599 }
600}
601
602#[derive(Debug, Serialize)]
608#[serde(untagged)]
609pub enum ControllerError {
610 #[serde(serialize_with = "serialize_io_error")]
612 IoError {
613 context: String,
615 io_error: IoError,
616 backtrace: Backtrace,
617 },
618
619 SchemaParseError {
621 error: String,
622 },
623
624 SchemaValidationError {
626 error: String,
627 },
628
629 CheckpointParseError {
631 error: String,
632 },
633
634 CheckpointDoesNotMatchPipeline,
635
636 RestoreInProgress,
639
640 BootstrapInProgress,
642
643 StepError(StepError),
645
646 UnexpectedStep {
648 actual: Step,
649 expected: Step,
650 },
651
652 ReplayFailure {
654 error: String,
655 },
656
657 NotSupported {
659 error: String,
660 },
661
662 IrParseError {
664 error: String,
665 },
666
667 CliArgsError {
669 error: String,
670 },
671
672 Config {
674 config_error: Box<ConfigError>,
675 },
676
677 UnknownInputEndpoint {
679 endpoint_name: String,
680 },
681
682 PreprocessorCreateError {
684 endpoint_name: String,
685 error: String,
686 },
687
688 UnknownOutputEndpoint {
690 endpoint_name: String,
691 },
692
693 ParseError {
699 endpoint_name: String,
700 error: Box<ParseError>,
701 },
702
703 #[serde(serialize_with = "serialize_encode_error")]
709 EncodeError {
710 endpoint_name: String,
711 error: AnyError,
712 },
713
714 #[serde(serialize_with = "serialize_input_transport_error")]
716 InputTransportError {
717 endpoint_name: String,
718 fatal: bool,
719 error: AnyError,
720 },
721
722 #[serde(serialize_with = "serialize_output_transport_error")]
724 OutputTransportError {
725 endpoint_name: String,
726 fatal: bool,
727 error: AnyError,
728 },
729
730 CommandError {
731 endpoint_name: String,
732 error: String,
733 },
734
735 DbspError {
737 error: DbspError,
738 },
739
740 PrometheusError {
742 error: String,
743 },
744
745 DbspPanic,
748
749 ControllerPanic,
751
752 ControllerExit,
754
755 #[serde(serialize_with = "serialize_storage_error")]
757 StorageError {
758 context: String,
760 error: StorageError,
761 backtrace: Box<Backtrace>,
762 },
763
764 EnterpriseFeature(&'static str),
766
767 SuspendError(SuspendError),
769
770 UnexpectedJsonStructure {
772 reason: String,
773 },
774
775 PipelineRestarted {
777 error: String,
778 },
779
780 UnknownEndpointInCompletionToken {
783 endpoint_id: u64,
784 },
785
786 CheckpointFetchError {
788 error: String,
789 },
790
791 CheckpointPushError {
793 error: String,
794 },
795
796 TransactionInProgress,
797 NoTransactionInProgress,
798
799 InvalidInitialStatus(RuntimeDesiredStatus),
801
802 InvalidStandby(&'static str),
804
805 InvalidStartupTransition {
807 from: RuntimeDesiredStatus,
808 to: RuntimeDesiredStatus,
809 },
810
811 BootstrapRejectedByUser,
812
813 BootstrapNotAllowed {
814 error: String,
815 },
816
817 UnexpectedBootstrap {
818 bootstrap_info: Option<BootstrapInfo>,
819 },
820
821 UnexpectedRuntimeVersion {
823 error: String,
824 },
825}
826
827impl ResponseError for ControllerError {
828 fn status_code(&self) -> StatusCode {
829 match self {
830 Self::Config { config_error }
831 if matches!(**config_error, ConfigError::UnknownInputStream { .. }) =>
832 {
833 StatusCode::NOT_FOUND
834 }
835 Self::Config { config_error }
836 if matches!(**config_error, ConfigError::UnknownOutputStream { .. }) =>
837 {
838 StatusCode::NOT_FOUND
839 }
840 Self::Config { .. } => StatusCode::BAD_REQUEST,
841 Self::UnknownInputEndpoint { .. } => StatusCode::NOT_FOUND,
842 Self::UnknownOutputEndpoint { .. } => StatusCode::NOT_FOUND,
843 Self::ParseError { .. } => StatusCode::BAD_REQUEST,
844 Self::NotSupported { .. } => StatusCode::BAD_REQUEST,
845 Self::EnterpriseFeature(_) => StatusCode::NOT_IMPLEMENTED,
846 Self::RestoreInProgress => StatusCode::SERVICE_UNAVAILABLE,
847 Self::BootstrapInProgress => StatusCode::SERVICE_UNAVAILABLE,
848 Self::PipelineRestarted { .. } => StatusCode::GONE,
849 Self::UnknownEndpointInCompletionToken { .. } => StatusCode::GONE,
850 Self::TransactionInProgress => StatusCode::CONFLICT,
851 Self::NoTransactionInProgress => StatusCode::BAD_REQUEST,
852 Self::InvalidInitialStatus(_) => StatusCode::GONE,
853 Self::BootstrapRejectedByUser => StatusCode::CONFLICT,
854 Self::UnexpectedBootstrap { .. } => StatusCode::INTERNAL_SERVER_ERROR,
855 Self::UnexpectedRuntimeVersion { .. } => StatusCode::INTERNAL_SERVER_ERROR,
856 _ => StatusCode::INTERNAL_SERVER_ERROR,
857 }
858 }
859
860 fn error_response(&self) -> HttpResponse<BoxBody> {
861 HttpResponseBuilder::new(self.status_code()).json(ErrorResponse::from_error(self))
862 }
863}
864
865fn serialize_io_error<S>(
866 context: &String,
867 io_error: &IoError,
868 backtrace: &Backtrace,
869 serializer: S,
870) -> Result<S::Ok, S::Error>
871where
872 S: Serializer,
873{
874 let mut ser = serializer.serialize_struct("IoError", 4)?;
875 ser.serialize_field("context", context)?;
876 ser.serialize_field("kind", &io_error.kind().to_string())?;
877 ser.serialize_field("os_error", &io_error.raw_os_error())?;
878 ser.serialize_field("backtrace", &backtrace.to_string())?;
879 ser.end()
880}
881
882fn serialize_storage_error<S>(
883 context: &String,
884 error: &StorageError,
885 backtrace: &Backtrace,
886 serializer: S,
887) -> Result<S::Ok, S::Error>
888where
889 S: Serializer,
890{
891 let mut ser = serializer.serialize_struct("StorageError", 4)?;
892 ser.serialize_field("context", context)?;
893 ser.serialize_field("error", &error.to_string())?;
894 ser.serialize_field("backtrace", &backtrace.to_string())?;
895 ser.end()
896}
897
898fn serialize_encode_error<S>(
899 endpoint: &String,
900 error: &AnyError,
901 serializer: S,
902) -> Result<S::Ok, S::Error>
903where
904 S: Serializer,
905{
906 let mut ser = serializer.serialize_struct("EncodeError", 3)?;
907 ser.serialize_field("endpoint_name", endpoint)?;
908 ser.serialize_field("error", &error.to_string())?;
909 ser.serialize_field("backtrace", &error.backtrace().to_string())?;
910 ser.end()
911}
912
913fn serialize_input_transport_error<S>(
914 endpoint: &String,
915 fatal: &bool,
916 error: &AnyError,
917 serializer: S,
918) -> Result<S::Ok, S::Error>
919where
920 S: Serializer,
921{
922 let mut ser = serializer.serialize_struct("InputTransportError", 4)?;
923 ser.serialize_field("endpoint_name", endpoint)?;
924 ser.serialize_field("fatal", fatal)?;
925 ser.serialize_field("error", &error.to_string())?;
926 ser.serialize_field("backtrace", &error.backtrace().to_string())?;
927 ser.end()
928}
929
930fn serialize_output_transport_error<S>(
931 endpoint: &String,
932 fatal: &bool,
933 error: &AnyError,
934 serializer: S,
935) -> Result<S::Ok, S::Error>
936where
937 S: Serializer,
938{
939 let mut ser = serializer.serialize_struct("OutputTransportError", 4)?;
940 ser.serialize_field("endpoint_name", endpoint)?;
941 ser.serialize_field("fatal", fatal)?;
942 ser.serialize_field("error", &error.to_string())?;
943 ser.serialize_field("backtrace", &error.backtrace().to_string())?;
944 ser.end()
945}
946
947impl DbspDetailedError for ControllerError {
948 fn error_code(&self) -> Cow<'static, str> {
950 match self {
951 Self::PreprocessorCreateError { .. } => Cow::from("PreprocessorCreateError"),
952 Self::IoError { .. } => Cow::from("ControllerIoError"),
953 Self::NotSupported { .. } => Cow::from("NotSupported"),
954 Self::SchemaParseError { .. } => Cow::from("SchemaParseError"),
955 Self::SchemaValidationError { .. } => Cow::from("SchemaParseError"),
956 Self::CheckpointParseError { .. } => Cow::from("CheckpointParseError"),
957 Self::CheckpointDoesNotMatchPipeline => Cow::from("CheckpointDoesNotMatchPipeline"),
958 Self::RestoreInProgress => Cow::from("RestoreInProgress"),
959 Self::BootstrapInProgress => Cow::from("BootstrapInProgress"),
960 Self::StepError { .. } => Cow::from("StepError"),
961 Self::UnexpectedStep { .. } => Cow::from("UnexpectedStep"),
962 Self::ReplayFailure { .. } => Cow::from("ReplayFailure"),
963 Self::IrParseError { .. } => Cow::from("IrParseError"),
964 Self::CliArgsError { .. } => Cow::from("ControllerCliArgsError"),
965 Self::Config { config_error } => {
966 Cow::from(format!("ConfigError.{}", config_error.error_code()))
967 }
968 Self::UnknownInputEndpoint { .. } => Cow::from("UnknownInputEndpoint"),
969 Self::UnknownOutputEndpoint { .. } => Cow::from("UnknownOutputEndpoint"),
970 Self::ParseError { .. } => Cow::from("ParseError"),
971 Self::EncodeError { .. } => Cow::from("EncodeError"),
972 Self::InputTransportError { .. } => Cow::from("InputTransportError"),
973 Self::OutputTransportError { .. } => Cow::from("OutputTransportError"),
974 Self::CommandError { .. } => Cow::from("CommandError"),
975 Self::PrometheusError { .. } => Cow::from("PrometheusError"),
976 Self::DbspError { error } => error.error_code(),
977 Self::DbspPanic => Cow::from("DbspPanic"),
978 Self::ControllerPanic => Cow::from("ControllerPanic"),
979 Self::ControllerExit => Cow::from("ControllerExit"),
980 Self::EnterpriseFeature(_) => Cow::from("EnterpriseFeature"),
981 Self::StorageError { .. } => Cow::from("StorageError"),
982 Self::SuspendError(_) => Cow::from("SuspendError"),
983 Self::UnexpectedJsonStructure { .. } => Cow::from("UnexpectedJsonStructure"),
984 Self::PipelineRestarted { .. } => Cow::from("PipelineRestarted"),
985 Self::UnknownEndpointInCompletionToken { .. } => {
986 Cow::from("UnknownEndpointInCompletionToken")
987 }
988 Self::CheckpointFetchError { .. } => Cow::from("CheckpointFetchError"),
989 Self::CheckpointPushError { .. } => Cow::from("CheckpointPushError"),
990 Self::TransactionInProgress => Cow::from("TransactionInProgress"),
991 Self::NoTransactionInProgress => Cow::from("NoTransactionInProgress"),
992 Self::InvalidInitialStatus(_) => Cow::from("InvalidInitialStatus"),
993 Self::InvalidStandby(_) => Cow::from("InvalidStandby"),
994 Self::InvalidStartupTransition { .. } => Cow::from("InvalidStartupTransition"),
995 Self::BootstrapRejectedByUser => Cow::from("BootstrapRejected"),
996 Self::BootstrapNotAllowed { .. } => Cow::from("BootstrapNotAllowed"),
997 Self::UnexpectedBootstrap { .. } => Cow::from("UnexpectedBootstrap"),
998 Self::UnexpectedRuntimeVersion { .. } => Cow::from("UnexpectedRuntimeVersion"),
999 }
1000 }
1001}
1002
1003impl DetailedError for ControllerError {
1004 fn error_code(&self) -> Cow<'static, str> {
1006 DbspDetailedError::error_code(self)
1007 }
1008}
1009
1010impl StdError for ControllerError {}
1011
1012impl Display for ControllerError {
1013 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
1014 match self {
1015 Self::PreprocessorCreateError {
1016 endpoint_name,
1017 error,
1018 } => {
1019 write!(
1020 f,
1021 "Error creating preprocessor for endpoint {endpoint_name}: {error}"
1022 )
1023 }
1024 Self::IoError {
1025 context, io_error, ..
1026 } => {
1027 write!(f, "I/O error {context}: {io_error}")
1028 }
1029 Self::NotSupported { error } => {
1030 write!(f, "Not supported: {error}")
1031 }
1032 Self::SchemaParseError { error } => {
1033 write!(f, "Error parsing program schema: {error}")
1034 }
1035 Self::SchemaValidationError { error } => {
1036 write!(f, "Error validating program schema: {error}")
1037 }
1038 Self::CheckpointParseError { error } => {
1039 write!(f, "Error parsing checkpoint file: {error}")
1040 }
1041 Self::CheckpointDoesNotMatchPipeline => {
1042 write!(
1043 f,
1044 "Recovery failed: the pipeline has been recovered from a checkpoint, but the checkpoint does not match the current pipeline definition. This can be caused by a corrupted checkpoint or an internal error."
1045 )
1046 }
1047 Self::RestoreInProgress => {
1048 write!(
1049 f,
1050 "Operation cannot be initiated now because the pipeline is restoring from a checkpoint."
1051 )
1052 }
1053 Self::BootstrapInProgress => {
1054 write!(
1055 f,
1056 "Operation cannot be initiated while the pipeline is bootstrapping."
1057 )
1058 }
1059 Self::StepError(error) => write!(f, "Error with persistent input steps: {error}"),
1060 Self::UnexpectedStep { actual, expected } => {
1061 write!(f, "Read step {actual}, expected {expected}")
1062 }
1063 Self::ReplayFailure { error } => {
1064 write!(f, "{error}")
1065 }
1066 Self::IrParseError { error } => {
1067 write!(f, "Error parsing program IR: {error}")
1068 }
1069 Self::CliArgsError { error } => {
1070 write!(f, "Error parsing command line arguments: {error}")
1071 }
1072 Self::Config { config_error } => {
1073 write!(f, "invalid controller configuration: {config_error}")
1074 }
1075 Self::UnknownInputEndpoint { endpoint_name } => {
1076 write!(f, "unknown input endpoint name '{endpoint_name}'")
1077 }
1078 Self::UnknownOutputEndpoint { endpoint_name } => {
1079 write!(f, "unknown output endpoint name '{endpoint_name}'")
1080 }
1081 Self::InputTransportError {
1082 endpoint_name,
1083 fatal,
1084 error,
1085 } => {
1086 write!(
1087 f,
1088 "{}error on input endpoint '{endpoint_name}': {}",
1089 if *fatal { "FATAL " } else { "" },
1090 error.root_cause()
1091 )
1092 }
1093 Self::OutputTransportError {
1094 endpoint_name,
1095 fatal,
1096 error,
1097 } => {
1098 write!(
1099 f,
1100 "{}error on output endpoint '{endpoint_name}': {}",
1101 if *fatal { "FATAL " } else { "" },
1102 error.root_cause()
1103 )
1104 }
1105 Self::CommandError {
1106 endpoint_name,
1107 error,
1108 } => {
1109 write!(
1110 f,
1111 "error executing command on output endpoint '{endpoint_name}': {error}"
1112 )
1113 }
1114 Self::ParseError {
1115 endpoint_name,
1116 error,
1117 } => {
1118 write!(
1119 f,
1120 "parse error on input endpoint '{endpoint_name}': {error}"
1121 )
1122 }
1123 Self::EncodeError {
1124 endpoint_name,
1125 error,
1126 } => {
1127 write!(
1128 f,
1129 "encoder error on output endpoint '{endpoint_name}': {error}"
1130 )
1131 }
1132 Self::PrometheusError { error } => {
1133 write!(f, "Error in the Prometheus metrics module: '{error}'")
1134 }
1135 Self::DbspError { error } => {
1136 write!(f, "DBSP error: {error}")
1137 }
1138 Self::DbspPanic => {
1139 write!(f, "Panic inside the DBSP runtime")
1140 }
1141 Self::ControllerPanic => {
1142 write!(f, "Panic inside the DBSP controller")
1143 }
1144 Self::ControllerExit => {
1145 write!(f, "Controller exited before command could be executed")
1146 }
1147 Self::EnterpriseFeature(feature) => {
1148 write!(
1149 f,
1150 "Cannot use enterprise-only feature ({feature}) in Feldera community edition."
1151 )
1152 }
1153 Self::StorageError { context, error, .. } => {
1154 write!(f, "I/O error {context}: {error}")
1155 }
1156 Self::SuspendError(error) => write!(f, "{error}"),
1157 Self::UnexpectedJsonStructure { reason } => {
1158 write!(f, "An unexpected JSON structure was detected: {reason}")
1159 }
1160 Self::PipelineRestarted { error } => {
1161 write!(f, "{error}")
1162 }
1163 Self::UnknownEndpointInCompletionToken { endpoint_id } => {
1164 write!(
1165 f,
1166 "completion token specifies input endpoint id {endpoint_id}, which doesn't exist; this indicates that the input connector was deleted after the completion token was generated"
1167 )
1168 }
1169 Self::CheckpointFetchError { error } => {
1170 write!(f, "Error fetching checkpoint from object store: {error}")
1171 }
1172 Self::CheckpointPushError { error } => {
1173 write!(f, "Error pushing checkpoint to object store: {error}")
1174 }
1175 Self::TransactionInProgress => {
1176 write!(
1177 f,
1178 "Cannot perform this operation while there is a transaction in progress"
1179 )
1180 }
1181 Self::NoTransactionInProgress => {
1182 write!(
1183 f,
1184 "This operation requires an active transaction, but none is currently in progress"
1185 )
1186 }
1187 Self::InvalidInitialStatus(status) => {
1188 write!(
1189 f,
1190 "Invalid initial status {status:?} provided on command line or read from storage (only coordination, running, paused, and standby are valid)"
1191 )
1192 }
1193 Self::InvalidStandby(standby) => {
1194 write!(f, "Cannot enter standby mode: {standby}")
1195 }
1196 Self::InvalidStartupTransition { from, to } => {
1197 write!(
1198 f,
1199 "Invalid pipeline startup transition from {from:?} to {to:?}"
1200 )
1201 }
1202 Self::BootstrapRejectedByUser => {
1203 write!(
1204 f,
1205 "Bootstrapping of the modified pipeline was rejected by the user."
1206 )
1207 }
1208 Self::BootstrapNotAllowed { error } => {
1209 write!(
1210 f,
1211 "The pipeline has been modified since the last checkpoint; however it cannot be bootstrapped due to the following reasons:\n{error}"
1212 )
1213 }
1214 Self::UnexpectedBootstrap { bootstrap_info } => {
1215 write!(
1216 f,
1217 "The pipeline's query plan was modified since the last checkpoint and requires bootstrapping; however we were not able to detect the changes in the pipeline metadata. This is a bug; please report to the developers. Bootstrap info: {bootstrap_info:?}"
1218 )
1219 }
1220 Self::UnexpectedRuntimeVersion { error } => {
1221 write!(f, "{error}")
1222 }
1223 }
1224 }
1225}
1226
1227impl ControllerError {
1228 pub fn io_error(context: impl Display, io_error: IoError) -> Self {
1229 Self::IoError {
1230 context: context.to_string(),
1231 io_error,
1232 backtrace: Backtrace::capture(),
1233 }
1234 }
1235
1236 pub fn not_supported(error: &str) -> Self {
1237 Self::NotSupported {
1238 error: error.to_string(),
1239 }
1240 }
1241
1242 pub fn schema_parse_error(error: &str) -> Self {
1243 Self::SchemaParseError {
1244 error: error.to_string(),
1245 }
1246 }
1247
1248 pub fn checkpoint_does_not_match_pipeline() -> Self {
1249 Self::CheckpointDoesNotMatchPipeline
1250 }
1251
1252 pub fn checkpoint_fetch_error(error: String) -> Self {
1253 Self::CheckpointFetchError { error }
1254 }
1255
1256 pub fn checkpoint_push_error(error: String) -> Self {
1257 Self::CheckpointPushError { error }
1258 }
1259
1260 pub fn schema_validation_error(error: &str) -> Self {
1261 Self::SchemaValidationError {
1262 error: error.to_string(),
1263 }
1264 }
1265
1266 pub fn ir_parse_error(error: &str) -> Self {
1267 Self::IrParseError {
1268 error: error.to_string(),
1269 }
1270 }
1271
1272 pub fn cli_args_error<E>(error: &E) -> Self
1273 where
1274 E: ToString,
1275 {
1276 Self::CliArgsError {
1277 error: error.to_string(),
1278 }
1279 }
1280
1281 pub fn unknown_input_endpoint(endpoint_name: &str) -> Self {
1282 Self::UnknownInputEndpoint {
1283 endpoint_name: endpoint_name.to_string(),
1284 }
1285 }
1286
1287 pub fn unknown_output_endpoint(endpoint_name: &str) -> Self {
1288 Self::UnknownOutputEndpoint {
1289 endpoint_name: endpoint_name.to_string(),
1290 }
1291 }
1292
1293 pub fn pipeline_config_parse_error<E>(error: &E) -> Self
1294 where
1295 E: ToString,
1296 {
1297 Self::Config {
1298 config_error: Box::new(ConfigError::pipeline_config_parse_error(error)),
1299 }
1300 }
1301
1302 pub fn parser_config_parse_error<E>(endpoint_name: &str, error: &E, config: &str) -> Self
1303 where
1304 E: ToString,
1305 {
1306 Self::Config {
1307 config_error: Box::new(ConfigError::parser_config_parse_error(
1308 endpoint_name,
1309 error,
1310 config,
1311 )),
1312 }
1313 }
1314
1315 pub fn encoder_config_parse_error<E>(endpoint_name: &str, error: &E, config: &str) -> Self
1316 where
1317 E: ToString,
1318 {
1319 Self::Config {
1320 config_error: Box::new(ConfigError::encoder_config_parse_error(
1321 endpoint_name,
1322 error,
1323 config,
1324 )),
1325 }
1326 }
1327
1328 pub fn duplicate_input_endpoint(endpoint_name: &str) -> Self {
1329 Self::Config {
1330 config_error: Box::new(ConfigError::duplicate_input_endpoint(endpoint_name)),
1331 }
1332 }
1333
1334 pub fn duplicate_input_stream(stream_name: &str) -> Self {
1335 Self::Config {
1336 config_error: Box::new(ConfigError::duplicate_input_stream(stream_name)),
1337 }
1338 }
1339
1340 pub fn unknown_input_format(endpoint_name: &str, format_name: &str) -> Self {
1341 Self::Config {
1342 config_error: Box::new(ConfigError::unknown_input_format(
1343 endpoint_name,
1344 format_name,
1345 )),
1346 }
1347 }
1348
1349 pub fn unknown_input_transport(endpoint_name: &str, transport_name: &str) -> Self {
1350 Self::Config {
1351 config_error: Box::new(ConfigError::unknown_input_transport(
1352 endpoint_name,
1353 transport_name,
1354 )),
1355 }
1356 }
1357
1358 pub fn duplicate_output_endpoint(endpoint_name: &str) -> Self {
1359 Self::Config {
1360 config_error: Box::new(ConfigError::duplicate_output_endpoint(endpoint_name)),
1361 }
1362 }
1363
1364 pub fn duplicate_output_stream(stream_name: &str) -> Self {
1365 Self::Config {
1366 config_error: Box::new(ConfigError::duplicate_output_stream(stream_name)),
1367 }
1368 }
1369
1370 pub fn unknown_output_format(endpoint_name: &str, format_name: &str) -> Self {
1371 Self::Config {
1372 config_error: Box::new(ConfigError::unknown_output_format(
1373 endpoint_name,
1374 format_name,
1375 )),
1376 }
1377 }
1378
1379 pub fn unknown_output_transport(endpoint_name: &str, transport_name: &str) -> Self {
1380 Self::Config {
1381 config_error: Box::new(ConfigError::unknown_output_transport(
1382 endpoint_name,
1383 transport_name,
1384 )),
1385 }
1386 }
1387
1388 pub fn unknown_input_stream(endpoint_name: &str, stream_name: &str) -> Self {
1389 Self::Config {
1390 config_error: Box::new(ConfigError::unknown_input_stream(
1391 endpoint_name,
1392 stream_name,
1393 )),
1394 }
1395 }
1396
1397 pub fn unknown_output_stream(endpoint_name: &str, stream_name: &str) -> Self {
1398 Self::Config {
1399 config_error: Box::new(ConfigError::unknown_output_stream(
1400 endpoint_name,
1401 stream_name,
1402 )),
1403 }
1404 }
1405
1406 pub fn unknown_index(endpoint_name: &str, index_name: &str) -> Self {
1407 Self::Config {
1408 config_error: Box::new(ConfigError::unknown_index(endpoint_name, index_name)),
1409 }
1410 }
1411
1412 pub fn not_an_index(endpoint_name: &str, index_name: &str) -> Self {
1413 Self::Config {
1414 config_error: Box::new(ConfigError::not_an_index(endpoint_name, index_name)),
1415 }
1416 }
1417
1418 pub fn input_format_not_supported(endpoint_name: &str, error: &str) -> Self {
1419 Self::Config {
1420 config_error: Box::new(ConfigError::input_format_not_supported(
1421 endpoint_name,
1422 error,
1423 )),
1424 }
1425 }
1426
1427 pub fn output_format_not_supported(endpoint_name: &str, error: &str) -> Self {
1428 Self::Config {
1429 config_error: Box::new(ConfigError::output_format_not_supported(
1430 endpoint_name,
1431 error,
1432 )),
1433 }
1434 }
1435
1436 pub fn input_format_not_specified(endpoint_name: &str) -> Self {
1437 Self::Config {
1438 config_error: Box::new(ConfigError::input_format_not_specified(endpoint_name)),
1439 }
1440 }
1441
1442 pub fn output_format_not_specified(endpoint_name: &str) -> Self {
1443 Self::Config {
1444 config_error: Box::new(ConfigError::output_format_not_specified(endpoint_name)),
1445 }
1446 }
1447
1448 pub fn invalid_encoder_configuration(endpoint_name: &str, error: &str) -> Self {
1449 Self::Config {
1450 config_error: Box::new(ConfigError::invalid_encoder_configuration(
1451 endpoint_name,
1452 error,
1453 )),
1454 }
1455 }
1456
1457 pub fn invalid_parser_configuration(endpoint_name: &str, error: &str) -> Self {
1458 Self::Config {
1459 config_error: Box::new(ConfigError::invalid_parser_configuration(
1460 endpoint_name,
1461 error,
1462 )),
1463 }
1464 }
1465
1466 pub fn invalid_transport_configuration(endpoint_name: &str, error: &str) -> Self {
1467 Self::Config {
1468 config_error: Box::new(ConfigError::invalid_transport_configuration(
1469 endpoint_name,
1470 error,
1471 )),
1472 }
1473 }
1474
1475 pub fn invalid_output_buffer_configuration(endpoint_name: &str, error: &str) -> Self {
1476 Self::Config {
1477 config_error: Box::new(ConfigError::invalid_output_buffer_configuration(
1478 endpoint_name,
1479 error,
1480 )),
1481 }
1482 }
1483
1484 pub fn input_transport_error(endpoint_name: &str, fatal: bool, error: AnyError) -> Self {
1485 Self::InputTransportError {
1486 endpoint_name: endpoint_name.to_owned(),
1487 fatal,
1488 error,
1489 }
1490 }
1491
1492 pub fn output_transport_error(endpoint_name: &str, fatal: bool, error: AnyError) -> Self {
1493 Self::OutputTransportError {
1494 endpoint_name: endpoint_name.to_owned(),
1495 fatal,
1496 error,
1497 }
1498 }
1499
1500 pub fn command_error(endpoint_name: &str, error: &str) -> Self {
1501 Self::CommandError {
1502 endpoint_name: endpoint_name.to_owned(),
1503 error: error.to_string(),
1504 }
1505 }
1506
1507 pub fn parse_error(endpoint_name: &str, error: ParseError) -> Self {
1508 Self::ParseError {
1509 endpoint_name: endpoint_name.to_owned(),
1510 error: Box::new(error),
1511 }
1512 }
1513
1514 pub fn encode_error(endpoint_name: &str, error: AnyError) -> Self {
1515 Self::EncodeError {
1516 endpoint_name: endpoint_name.to_owned(),
1517 error,
1518 }
1519 }
1520
1521 pub fn prometheus_error<E>(error: &E) -> Self
1522 where
1523 E: ToString,
1524 {
1525 Self::PrometheusError {
1526 error: error.to_string(),
1527 }
1528 }
1529
1530 pub fn dbsp_error(error: DbspError) -> Self {
1531 Self::DbspError { error }
1532 }
1533
1534 pub fn dbsp_panic() -> Self {
1535 Self::DbspPanic
1536 }
1537
1538 pub fn controller_panic() -> Self {
1539 Self::ControllerPanic
1540 }
1541
1542 pub fn storage_error(context: impl Display, error: StorageError) -> Self {
1543 Self::StorageError {
1544 context: context.to_string(),
1545 error,
1546 backtrace: Box::new(Backtrace::capture()),
1547 }
1548 }
1549
1550 pub fn pipeline_restarted(error: &str) -> Self {
1551 Self::PipelineRestarted {
1552 error: error.to_string(),
1553 }
1554 }
1555
1556 pub fn unknown_endpoint_in_completion_token(endpoint_id: u64) -> Self {
1557 Self::UnknownEndpointInCompletionToken { endpoint_id }
1558 }
1559
1560 pub fn kind(&self) -> ErrorKind {
1561 match self {
1562 Self::IoError { io_error, .. } => io_error.kind(),
1563 Self::StorageError { error, .. } => error.kind(),
1564 Self::StepError(error) => error.kind(),
1565 Self::RestoreInProgress
1566 | Self::BootstrapInProgress
1567 | Self::SuspendError(SuspendError::Temporary(_))
1568 | Self::TransactionInProgress => ErrorKind::ResourceBusy,
1569 Self::NotSupported { .. } | Self::SuspendError(SuspendError::Permanent(_)) => {
1570 ErrorKind::Unsupported
1571 }
1572 Self::DbspError {
1573 error: DbspError::IO(error),
1574 } => error.kind(),
1575 Self::DbspError { .. }
1576 | Self::SchemaParseError { .. }
1577 | Self::SchemaValidationError { .. }
1578 | Self::CheckpointParseError { .. }
1579 | Self::CheckpointDoesNotMatchPipeline
1580 | Self::UnexpectedStep { .. }
1581 | Self::ReplayFailure { .. }
1582 | Self::IrParseError { .. }
1583 | Self::CliArgsError { .. }
1584 | Self::Config { .. }
1585 | Self::UnknownInputEndpoint { .. }
1586 | Self::UnknownOutputEndpoint { .. }
1587 | Self::ParseError { .. }
1588 | Self::EncodeError { .. }
1589 | Self::InputTransportError { .. }
1590 | Self::OutputTransportError { .. }
1591 | Self::CommandError { .. }
1592 | Self::PrometheusError { .. }
1593 | Self::DbspPanic
1594 | Self::ControllerPanic
1595 | Self::ControllerExit
1596 | Self::EnterpriseFeature(_)
1597 | Self::UnexpectedJsonStructure { .. }
1598 | Self::UnknownEndpointInCompletionToken { .. }
1599 | Self::CheckpointFetchError { .. }
1600 | Self::CheckpointPushError { .. }
1601 | Self::PipelineRestarted { .. }
1602 | Self::NoTransactionInProgress
1603 | Self::InvalidInitialStatus(_)
1604 | Self::BootstrapRejectedByUser
1605 | Self::BootstrapNotAllowed { .. }
1606 | Self::UnexpectedBootstrap { .. }
1607 | Self::InvalidStandby(_)
1608 | Self::InvalidStartupTransition { .. }
1609 | Self::PreprocessorCreateError { .. }
1610 | Self::UnexpectedRuntimeVersion { .. } => ErrorKind::Other,
1611 }
1612 }
1613}
1614
1615impl From<ConfigError> for ControllerError {
1616 fn from(config_error: ConfigError) -> Self {
1617 Self::Config {
1618 config_error: Box::new(config_error),
1619 }
1620 }
1621}
1622
1623impl From<DbspError> for ControllerError {
1624 fn from(error: DbspError) -> Self {
1625 Self::DbspError { error }
1626 }
1627}
1628
1629impl From<SuspendError> for ControllerError {
1630 fn from(error: SuspendError) -> Self {
1631 Self::SuspendError(error)
1632 }
1633}