Skip to main content

feldera_adapterlib/errors/
controller.rs

1use std::{
2    backtrace::Backtrace,
3    borrow::Cow,
4    error::Error as StdError,
5    fmt::{Display, Error as FmtError, Formatter},
6    io::{Error as IoError, ErrorKind},
7    string::ToString,
8};
9
10use actix_web::body::BoxBody;
11use actix_web::http::StatusCode;
12use actix_web::{HttpResponse, HttpResponseBuilder, ResponseError};
13use anyhow::Error as AnyError;
14use dbsp::{
15    Error as DbspError,
16    circuit::{LayoutError, circuit_builder::BootstrapInfo},
17    storage::backend::StorageError,
18};
19use feldera_types::{
20    error::{DetailedError, ErrorResponse},
21    runtime_status::RuntimeDesiredStatus,
22    suspend::SuspendError,
23};
24use serde::{Serialize, Serializer, ser::SerializeStruct};
25
26use super::journal::StepError;
27use crate::{DbspDetailedError, format::ParseError, transport::Step};
28
29/// Controller configuration error.
30#[derive(Debug, Serialize)]
31#[serde(untagged)]
32pub enum ConfigError {
33    /// Failed to parse pipeline configuration.
34    PipelineConfigParseError {
35        error: String,
36    },
37
38    /// Failed to parse parser configuration for an endpoint.
39    ParserConfigParseError {
40        endpoint_name: String,
41        error: String,
42        config: String,
43    },
44
45    /// Failed to parse encoder configuration for an endpoint.
46    EncoderConfigParseError {
47        endpoint_name: String,
48        error: String,
49        config: String,
50    },
51
52    /// Input endpoint with this name already exists.
53    DuplicateInputEndpoint {
54        endpoint_name: String,
55    },
56
57    /// Input table with this name already exists.
58    DuplicateInputStream {
59        stream_name: String,
60    },
61
62    /// Output endpoint with this name already exists.
63    DuplicateOutputEndpoint {
64        endpoint_name: String,
65    },
66
67    /// Output view with this name already exists.
68    DuplicateOutputStream {
69        stream_name: String,
70    },
71
72    /// Endpoint configuration specifies unknown input format name.
73    UnknownInputFormat {
74        endpoint_name: String,
75        format_name: String,
76    },
77
78    /// Endpoint configuration specifies unknown output format name.
79    UnknownOutputFormat {
80        endpoint_name: String,
81        format_name: String,
82    },
83
84    /// Endpoint configuration specifies unknown input transport name.
85    UnknownInputTransport {
86        endpoint_name: String,
87        transport_name: String,
88    },
89
90    /// Endpoint configuration specifies unknown output transport name.
91    UnknownOutputTransport {
92        endpoint_name: String,
93        transport_name: String,
94    },
95
96    /// Endpoint configuration specifies an input stream name
97    /// that is not found in the circuit catalog.
98    UnknownInputStream {
99        endpoint_name: String,
100        stream_name: String,
101    },
102
103    /// Endpoint configuration specifies an output stream name
104    /// that is not found in the circuit catalog.
105    UnknownOutputStream {
106        endpoint_name: String,
107        stream_name: String,
108    },
109
110    /// Endpoint configuration specifies an index name
111    /// that is not found in the circuit catalog.
112    UnknownIndex {
113        endpoint_name: String,
114        index_name: String,
115    },
116
117    NotAnIndex {
118        endpoint_name: String,
119        index_name: String,
120    },
121
122    InputFormatNotSupported {
123        endpoint_name: String,
124        error: String,
125    },
126
127    OutputFormatNotSupported {
128        endpoint_name: String,
129        error: String,
130    },
131
132    InputFormatNotSpecified {
133        endpoint_name: String,
134    },
135
136    OutputFormatNotSpecified {
137        endpoint_name: String,
138    },
139
140    InvalidEncoderConfig {
141        endpoint_name: String,
142        error: String,
143    },
144
145    InvalidParserConfig {
146        endpoint_name: String,
147        error: String,
148    },
149
150    InvalidTransportConfig {
151        endpoint_name: String,
152        error: String,
153    },
154
155    InvalidOutputBufferConfig {
156        endpoint_name: String,
157        error: String,
158    },
159
160    CyclicDependency {
161        cycle: Vec<(String, String)>,
162    },
163
164    EmptyStartAfter {
165        endpoint_name: String,
166    },
167
168    FtRequiresStorage,
169    FtRequiresFtInput,
170
171    /// `datafusion_memory_mb` was set to a value greater than or equal to
172    /// the pipeline's effective memory budget, which would leave no memory
173    /// for the DBSP circuit.
174    DatafusionMemoryExceedsBudget {
175        datafusion_memory_mb: u64,
176        max_rss_mb: u64,
177    },
178
179    InvalidLayout(LayoutError),
180}
181
182impl StdError for ConfigError {}
183
184impl DbspDetailedError for ConfigError {
185    fn error_code(&self) -> Cow<'static, str> {
186        match self {
187            Self::PipelineConfigParseError { .. } => Cow::from("PipelineConfigParseError"),
188            Self::ParserConfigParseError { .. } => Cow::from("ParserConfigParseError"),
189            Self::EncoderConfigParseError { .. } => Cow::from("EncoderConfigParseError"),
190            Self::DuplicateInputEndpoint { .. } => Cow::from("DuplicateInputEndpoint"),
191            Self::DuplicateInputStream { .. } => Cow::from("DuplicateInputStream"),
192            Self::DuplicateOutputEndpoint { .. } => Cow::from("DuplicateOutputEndpoint"),
193            Self::DuplicateOutputStream { .. } => Cow::from("DuplicateOutputStream"),
194            Self::UnknownInputFormat { .. } => Cow::from("UnknownInputFormat"),
195            Self::UnknownOutputFormat { .. } => Cow::from("UnknownOutputFormat"),
196            Self::UnknownInputTransport { .. } => Cow::from("UnknownInputTransport"),
197            Self::UnknownOutputTransport { .. } => Cow::from("UnknownOutputTransport"),
198            Self::UnknownInputStream { .. } => Cow::from("UnknownInputStream"),
199            Self::UnknownOutputStream { .. } => Cow::from("UnknownOutputStream"),
200            Self::UnknownIndex { .. } => Cow::from("UnknownIndex"),
201            Self::NotAnIndex { .. } => Cow::from("NotAnIndex"),
202            Self::InputFormatNotSupported { .. } => Cow::from("InputFormatNotSupported"),
203            Self::OutputFormatNotSupported { .. } => Cow::from("OutputFormatNotSupported"),
204            Self::InputFormatNotSpecified { .. } => Cow::from("InputFormatNotSpecified"),
205            Self::OutputFormatNotSpecified { .. } => Cow::from("OutputFormatNotSpecified"),
206            Self::InvalidEncoderConfig { .. } => Cow::from("InvalidEncoderConfig"),
207            Self::InvalidParserConfig { .. } => Cow::from("InvalidParserConfig"),
208            Self::InvalidTransportConfig { .. } => Cow::from("InvalidTransportConfig"),
209            Self::InvalidOutputBufferConfig { .. } => Cow::from("InvalidOutputBufferConfig"),
210            Self::FtRequiresStorage => Cow::from("FtRequiresStorage"),
211            Self::FtRequiresFtInput => Cow::from("FtWithNonFtInput"),
212            Self::CyclicDependency { .. } => Cow::from("CyclicDependency"),
213            Self::EmptyStartAfter { .. } => Cow::from("EmptyStartAfter"),
214            Self::DatafusionMemoryExceedsBudget { .. } => {
215                Cow::from("DatafusionMemoryExceedsBudget")
216            }
217            Self::InvalidLayout(_) => Cow::from("LayoutError"),
218        }
219    }
220}
221
222impl Display for ConfigError {
223    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
224        match self {
225            Self::PipelineConfigParseError { error } => {
226                write!(f, "Failed to parse pipeline configuration: {error}")
227            }
228            Self::ParserConfigParseError {
229                endpoint_name,
230                error,
231                config,
232            } => {
233                write!(
234                    f,
235                    "Error parsing format configuration for input endpoint '{endpoint_name}': {error}\nInvalid configuration: {config}"
236                )
237            }
238            Self::EncoderConfigParseError {
239                endpoint_name,
240                error,
241                config,
242            } => {
243                write!(
244                    f,
245                    "Error parsing format configuration for output endpoint '{endpoint_name}': {error}\nInvalid configuration: {config}"
246                )
247            }
248            Self::DuplicateInputEndpoint { endpoint_name } => {
249                write!(f, "Input endpoint '{endpoint_name}' already exists")
250            }
251            Self::DuplicateInputStream { stream_name } => {
252                write!(f, "Duplicate table name '{stream_name}'")
253            }
254            Self::UnknownInputFormat {
255                endpoint_name,
256                format_name,
257            } => {
258                write!(
259                    f,
260                    "Input endpoint '{endpoint_name}' specifies unknown input format '{format_name}'"
261                )
262            }
263            Self::UnknownInputTransport {
264                endpoint_name,
265                transport_name,
266            } => {
267                write!(
268                    f,
269                    "Input endpoint '{endpoint_name}' specifies unknown input transport '{transport_name}'"
270                )
271            }
272            Self::DuplicateOutputEndpoint { endpoint_name } => {
273                write!(f, "Output endpoint '{endpoint_name}' already exists")
274            }
275            Self::DuplicateOutputStream { stream_name } => {
276                write!(f, "Duplicate table or view name '{stream_name}'")
277            }
278            Self::UnknownOutputFormat {
279                endpoint_name,
280                format_name,
281            } => {
282                write!(
283                    f,
284                    "Output endpoint '{endpoint_name}' specifies unknown output format '{format_name}'"
285                )
286            }
287            Self::UnknownOutputTransport {
288                endpoint_name,
289                transport_name,
290            } => {
291                write!(
292                    f,
293                    "Output endpoint '{endpoint_name}' specifies unknown output transport '{transport_name}'"
294                )
295            }
296            Self::UnknownInputStream {
297                endpoint_name,
298                stream_name,
299            } => {
300                write!(
301                    f,
302                    "Input endpoint '{endpoint_name}' specifies unknown table '{stream_name}'"
303                )
304            }
305            Self::UnknownOutputStream {
306                endpoint_name,
307                stream_name,
308            } => {
309                write!(
310                    f,
311                    "Output endpoint '{endpoint_name}' specifies unknown output table or view '{stream_name}'"
312                )
313            }
314            Self::UnknownIndex {
315                endpoint_name,
316                index_name,
317            } => {
318                write!(
319                    f,
320                    "Output endpoint '{endpoint_name}' specifies index name '{index_name}'; however, the '{index_name}' relation is not an index"
321                )
322            }
323
324            Self::NotAnIndex {
325                endpoint_name,
326                index_name,
327            } => {
328                write!(
329                    f,
330                    "Output endpoint '{endpoint_name}' specifies unknown index '{index_name}'"
331                )
332            }
333
334            Self::InputFormatNotSupported {
335                endpoint_name,
336                error,
337            } => {
338                write!(
339                    f,
340                    "Format not supported on input endpoint '{endpoint_name}': {error}"
341                )
342            }
343            Self::OutputFormatNotSupported {
344                endpoint_name,
345                error,
346            } => {
347                write!(
348                    f,
349                    "Format not supported on output endpoint '{endpoint_name}': {error}"
350                )
351            }
352            Self::InputFormatNotSpecified { endpoint_name } => {
353                write!(
354                    f,
355                    "Data format is not specified for input endpoint '{endpoint_name}' (set the 'format' field inside connector configuration)"
356                )
357            }
358            Self::OutputFormatNotSpecified { endpoint_name } => {
359                write!(
360                    f,
361                    "Data format is not specified for output endpoint '{endpoint_name}' (set the 'format' field inside connector configuration)"
362                )
363            }
364            Self::InvalidEncoderConfig {
365                endpoint_name,
366                error,
367            } => {
368                write!(
369                    f,
370                    "invalid format configuration for output endpoint '{endpoint_name}': {error}"
371                )
372            }
373            Self::InvalidParserConfig {
374                endpoint_name,
375                error,
376            } => {
377                write!(
378                    f,
379                    "invalid format configuration for input endpoint '{endpoint_name}': {error}"
380                )
381            }
382            Self::InvalidTransportConfig {
383                endpoint_name,
384                error,
385            } => {
386                write!(
387                    f,
388                    "invalid transport configuration for endpoint '{endpoint_name}': {error}"
389                )
390            }
391            Self::InvalidOutputBufferConfig {
392                endpoint_name,
393                error,
394            } => {
395                write!(
396                    f,
397                    "invalid output buffer configuration for endpoint '{endpoint_name}': {error}"
398                )
399            }
400            Self::CyclicDependency { cycle } => {
401                let mut cycle = cycle.clone();
402                cycle.push(cycle[0].clone());
403                let tail = cycle[1..]
404                    .iter()
405                    .map(|(endpoint, label)| {
406                        format!("waits for endpoint '{endpoint}' with label '{label}'")
407                    })
408                    .collect::<Vec<_>>()
409                    .join(", which ");
410                write!(
411                    f,
412                    "cyclic 'start_after' dependency detected: endpoint '{}' with label '{}' {}",
413                    cycle[0].0, cycle[0].1, tail
414                )
415            }
416            Self::EmptyStartAfter { endpoint_name } => {
417                write!(
418                    f,
419                    "empty 'start_after' field for input endpoint '{}'",
420                    endpoint_name
421                )
422            }
423            Self::FtRequiresStorage => write!(
424                f,
425                "Fault tolerance is configured, which requires storage, but storage is not enabled"
426            ),
427            Self::FtRequiresFtInput => write!(
428                f,
429                "Fault tolerance is configured, but it cannot be enabled because the pipeline has at least one non-fault-tolerant input adapter"
430            ),
431            Self::DatafusionMemoryExceedsBudget {
432                datafusion_memory_mb,
433                max_rss_mb,
434            } => write!(
435                f,
436                "'datafusion_memory_mb' ({datafusion_memory_mb} MB) must be less than the pipeline's memory budget ({max_rss_mb} MB); the difference is the budget available to the DBSP circuit"
437            ),
438            Self::InvalidLayout(e) => write!(f, "Multihost layout error: {e}"),
439        }
440    }
441}
442
443impl ConfigError {
444    pub fn pipeline_config_parse_error<E>(error: &E) -> Self
445    where
446        E: ToString,
447    {
448        Self::PipelineConfigParseError {
449            error: error.to_string(),
450        }
451    }
452
453    pub fn parser_config_parse_error<E>(endpoint_name: &str, error: &E, config: &str) -> Self
454    where
455        E: ToString,
456    {
457        Self::ParserConfigParseError {
458            endpoint_name: endpoint_name.to_owned(),
459            error: error.to_string(),
460            config: config.to_string(),
461        }
462    }
463
464    pub fn encoder_config_parse_error<E>(endpoint_name: &str, error: &E, config: &str) -> Self
465    where
466        E: ToString,
467    {
468        Self::EncoderConfigParseError {
469            endpoint_name: endpoint_name.to_owned(),
470            error: error.to_string(),
471            config: config.to_string(),
472        }
473    }
474
475    pub fn duplicate_input_endpoint(endpoint_name: &str) -> Self {
476        Self::DuplicateInputEndpoint {
477            endpoint_name: endpoint_name.to_owned(),
478        }
479    }
480
481    pub fn duplicate_input_stream(stream_name: &str) -> Self {
482        Self::DuplicateInputStream {
483            stream_name: stream_name.to_owned(),
484        }
485    }
486
487    pub fn unknown_input_format(endpoint_name: &str, format_name: &str) -> Self {
488        Self::UnknownInputFormat {
489            endpoint_name: endpoint_name.to_owned(),
490            format_name: format_name.to_owned(),
491        }
492    }
493
494    pub fn unknown_input_transport(endpoint_name: &str, transport_name: &str) -> Self {
495        Self::UnknownInputTransport {
496            endpoint_name: endpoint_name.to_owned(),
497            transport_name: transport_name.to_owned(),
498        }
499    }
500
501    pub fn duplicate_output_endpoint(endpoint_name: &str) -> Self {
502        Self::DuplicateOutputEndpoint {
503            endpoint_name: endpoint_name.to_owned(),
504        }
505    }
506
507    pub fn duplicate_output_stream(stream_name: &str) -> Self {
508        Self::DuplicateOutputStream {
509            stream_name: stream_name.to_owned(),
510        }
511    }
512
513    pub fn unknown_output_format(endpoint_name: &str, format_name: &str) -> Self {
514        Self::UnknownOutputFormat {
515            endpoint_name: endpoint_name.to_owned(),
516            format_name: format_name.to_owned(),
517        }
518    }
519
520    pub fn unknown_output_transport(endpoint_name: &str, transport_name: &str) -> Self {
521        Self::UnknownOutputTransport {
522            endpoint_name: endpoint_name.to_owned(),
523            transport_name: transport_name.to_owned(),
524        }
525    }
526
527    pub fn unknown_input_stream(endpoint_name: &str, stream_name: &str) -> Self {
528        Self::UnknownInputStream {
529            endpoint_name: endpoint_name.to_owned(),
530            stream_name: stream_name.to_owned(),
531        }
532    }
533
534    pub fn unknown_output_stream(endpoint_name: &str, stream_name: &str) -> Self {
535        Self::UnknownOutputStream {
536            endpoint_name: endpoint_name.to_owned(),
537            stream_name: stream_name.to_owned(),
538        }
539    }
540
541    pub fn unknown_index(endpoint_name: &str, index_name: &str) -> Self {
542        Self::UnknownIndex {
543            endpoint_name: endpoint_name.to_owned(),
544            index_name: index_name.to_owned(),
545        }
546    }
547
548    pub fn not_an_index(endpoint_name: &str, index_name: &str) -> Self {
549        Self::NotAnIndex {
550            endpoint_name: endpoint_name.to_owned(),
551            index_name: index_name.to_owned(),
552        }
553    }
554
555    pub fn input_format_not_supported(endpoint_name: &str, error: &str) -> Self {
556        Self::InputFormatNotSupported {
557            endpoint_name: endpoint_name.to_owned(),
558            error: error.to_owned(),
559        }
560    }
561
562    pub fn output_format_not_supported(endpoint_name: &str, error: &str) -> Self {
563        Self::OutputFormatNotSupported {
564            endpoint_name: endpoint_name.to_owned(),
565            error: error.to_owned(),
566        }
567    }
568
569    pub fn input_format_not_specified(endpoint_name: &str) -> Self {
570        Self::InputFormatNotSpecified {
571            endpoint_name: endpoint_name.to_owned(),
572        }
573    }
574
575    pub fn output_format_not_specified(endpoint_name: &str) -> Self {
576        Self::OutputFormatNotSpecified {
577            endpoint_name: endpoint_name.to_owned(),
578        }
579    }
580
581    pub fn invalid_encoder_configuration(endpoint_name: &str, error: &str) -> Self {
582        Self::InvalidEncoderConfig {
583            endpoint_name: endpoint_name.to_string(),
584            error: error.to_string(),
585        }
586    }
587
588    pub fn invalid_parser_configuration(endpoint_name: &str, error: &str) -> Self {
589        Self::InvalidParserConfig {
590            endpoint_name: endpoint_name.to_string(),
591            error: error.to_string(),
592        }
593    }
594
595    pub fn invalid_transport_configuration(endpoint_name: &str, error: &str) -> Self {
596        Self::InvalidTransportConfig {
597            endpoint_name: endpoint_name.to_string(),
598            error: error.to_string(),
599        }
600    }
601
602    pub fn invalid_output_buffer_configuration(endpoint_name: &str, error: &str) -> Self {
603        Self::InvalidOutputBufferConfig {
604            endpoint_name: endpoint_name.to_string(),
605            error: error.to_string(),
606        }
607    }
608
609    pub fn cyclic_dependency(cycle: Vec<(String, String)>) -> Self {
610        Self::CyclicDependency { cycle }
611    }
612
613    pub fn empty_start_after(endpoint_name: &str) -> Self {
614        Self::EmptyStartAfter {
615            endpoint_name: endpoint_name.to_string(),
616        }
617    }
618}
619
620/// Controller error.
621///
622/// Reports all errors that arise from operating a streaming pipeline consisting
623/// of input adapters, output adapters, and a DBSP circuit, via the controller
624/// API.
625#[derive(Debug, Serialize)]
626#[serde(untagged)]
627pub enum ControllerError {
628    /// I/O error.
629    #[serde(serialize_with = "serialize_io_error")]
630    IoError {
631        /// Describes the context where the error occurred.
632        context: String,
633        io_error: IoError,
634        backtrace: Backtrace,
635    },
636
637    /// Error parsing program schema.
638    SchemaParseError {
639        error: String,
640    },
641
642    /// Error validating program schema.
643    SchemaValidationError {
644        error: String,
645    },
646
647    /// Error parsing the checkpoint.
648    CheckpointParseError {
649        error: String,
650    },
651
652    CheckpointDoesNotMatchPipeline,
653
654    /// Operation cannot be initiated now because the pipeline is being restored
655    /// from a checkpoint.
656    RestoreInProgress,
657
658    /// Operation cannot be initiated now because the pipeline is bootstrapping new/modified views.
659    BootstrapInProgress,
660
661    /// Error in journal metadata.
662    StepError(StepError),
663
664    /// Unexpected step number.
665    UnexpectedStep {
666        actual: Step,
667        expected: Step,
668    },
669
670    /// Step replay failure.
671    ReplayFailure {
672        error: String,
673    },
674
675    /// Feature is not supported.
676    NotSupported {
677        error: String,
678    },
679
680    /// Error parsing program IR file.
681    IrParseError {
682        error: String,
683    },
684
685    /// Error parsing CLI arguments.
686    CliArgsError {
687        error: String,
688    },
689
690    /// Invalid controller configuration.
691    Config {
692        config_error: Box<ConfigError>,
693    },
694
695    /// Unknown input endpoint name.
696    UnknownInputEndpoint {
697        endpoint_name: String,
698    },
699
700    /// Error creating a user-defined preprocessor
701    PreprocessorCreateError {
702        endpoint_name: String,
703        error: String,
704    },
705
706    /// Error creating a user-defined postprocessor
707    PostprocessorCreateError {
708        endpoint_name: String,
709        error: String,
710    },
711
712    /// Unknown output endpoint name.
713    UnknownOutputEndpoint {
714        endpoint_name: String,
715    },
716
717    /// Error parsing input data.
718    ///
719    /// Parser errors are expected to be
720    /// recoverable, i.e., the parser should be able to successfully parse
721    /// new valid inputs after an error.
722    ParseError {
723        endpoint_name: String,
724        error: Box<ParseError>,
725    },
726
727    /// Encode error.
728    ///
729    /// Error encoding the last output batch.  Encoder errors are expected to
730    /// be recoverable, i.e., the encoder should be able to successfully parse
731    /// new valid inputs after an error.
732    #[serde(serialize_with = "serialize_encode_error")]
733    EncodeError {
734        endpoint_name: String,
735        error: AnyError,
736    },
737
738    /// Input transport endpoint error.
739    #[serde(serialize_with = "serialize_input_transport_error")]
740    InputTransportError {
741        endpoint_name: String,
742        fatal: bool,
743        error: AnyError,
744    },
745
746    /// Output transport endpoint error.
747    #[serde(serialize_with = "serialize_output_transport_error")]
748    OutputTransportError {
749        endpoint_name: String,
750        fatal: bool,
751        error: AnyError,
752    },
753
754    CommandError {
755        endpoint_name: String,
756        error: String,
757    },
758
759    /// Error evaluating the DBSP circuit.
760    DbspError {
761        error: DbspError,
762    },
763
764    /// Error inside the Prometheus module.
765    PrometheusError {
766        error: String,
767    },
768
769    // TODO: we currently don't have a way to include more info about the panic.
770    /// Panic inside the DBSP runtime.
771    DbspPanic,
772
773    /// Panic inside the DBSP controller.
774    ControllerPanic,
775
776    /// Controller terminated before command could be executed.
777    ControllerExit,
778
779    /// Storage error.
780    #[serde(serialize_with = "serialize_storage_error")]
781    StorageError {
782        /// Describes the context where the error occurred.
783        context: String,
784        error: StorageError,
785        backtrace: Box<Backtrace>,
786    },
787
788    /// Enterprise-only feature.
789    EnterpriseFeature(&'static str),
790
791    /// Cannot checkpoint or suspend.
792    SuspendError(SuspendError),
793
794    /// An unexpected JSON serialized structure was encountered while processing the /stats endpoint.
795    UnexpectedJsonStructure {
796        reason: String,
797    },
798
799    /// The request relates to an old incarnation of the pipeline.
800    PipelineRestarted {
801        error: String,
802    },
803
804    /// Completion token specified non-existing endpoint id. This indicates that the endpoint was removed
805    /// or the token is invalid.
806    UnknownEndpointInCompletionToken {
807        endpoint_id: u64,
808    },
809
810    /// Error fetching checkpoint from remote object storage.
811    CheckpointFetchError {
812        error: String,
813    },
814
815    /// Error pushing checkpoint to remote object storage.
816    CheckpointPushError {
817        error: String,
818    },
819
820    TransactionInProgress,
821    NoTransactionInProgress,
822
823    /// Invalid initial desired status.
824    InvalidInitialStatus(RuntimeDesiredStatus),
825
826    /// Invalid standby configuration,
827    InvalidStandby(&'static str),
828
829    /// Invalid startup status transition.
830    InvalidStartupTransition {
831        from: RuntimeDesiredStatus,
832        to: RuntimeDesiredStatus,
833    },
834
835    BootstrapRejectedByUser,
836
837    BootstrapNotAllowed {
838        error: String,
839    },
840
841    UnexpectedBootstrap {
842        bootstrap_info: Option<BootstrapInfo>,
843    },
844
845    /// Unexpected runtime version
846    UnexpectedRuntimeVersion {
847        error: String,
848    },
849}
850
851impl ResponseError for ControllerError {
852    fn status_code(&self) -> StatusCode {
853        match self {
854            Self::Config { config_error }
855                if matches!(**config_error, ConfigError::UnknownInputStream { .. }) =>
856            {
857                StatusCode::NOT_FOUND
858            }
859            Self::Config { config_error }
860                if matches!(**config_error, ConfigError::UnknownOutputStream { .. }) =>
861            {
862                StatusCode::NOT_FOUND
863            }
864            Self::Config { .. } => StatusCode::BAD_REQUEST,
865            Self::UnknownInputEndpoint { .. } => StatusCode::NOT_FOUND,
866            Self::UnknownOutputEndpoint { .. } => StatusCode::NOT_FOUND,
867            Self::ParseError { .. } => StatusCode::BAD_REQUEST,
868            Self::NotSupported { .. } => StatusCode::BAD_REQUEST,
869            Self::EnterpriseFeature(_) => StatusCode::NOT_IMPLEMENTED,
870            Self::RestoreInProgress => StatusCode::SERVICE_UNAVAILABLE,
871            Self::BootstrapInProgress => StatusCode::SERVICE_UNAVAILABLE,
872            Self::PipelineRestarted { .. } => StatusCode::GONE,
873            Self::UnknownEndpointInCompletionToken { .. } => StatusCode::GONE,
874            Self::TransactionInProgress => StatusCode::CONFLICT,
875            Self::NoTransactionInProgress => StatusCode::BAD_REQUEST,
876            Self::InvalidInitialStatus(_) => StatusCode::GONE,
877            Self::BootstrapRejectedByUser => StatusCode::CONFLICT,
878            Self::UnexpectedBootstrap { .. } => StatusCode::INTERNAL_SERVER_ERROR,
879            Self::UnexpectedRuntimeVersion { .. } => StatusCode::INTERNAL_SERVER_ERROR,
880            _ => StatusCode::INTERNAL_SERVER_ERROR,
881        }
882    }
883
884    fn error_response(&self) -> HttpResponse<BoxBody> {
885        HttpResponseBuilder::new(self.status_code()).json(ErrorResponse::from_error(self))
886    }
887}
888
889fn serialize_io_error<S>(
890    context: &String,
891    io_error: &IoError,
892    backtrace: &Backtrace,
893    serializer: S,
894) -> Result<S::Ok, S::Error>
895where
896    S: Serializer,
897{
898    let mut ser = serializer.serialize_struct("IoError", 4)?;
899    ser.serialize_field("context", context)?;
900    ser.serialize_field("kind", &io_error.kind().to_string())?;
901    ser.serialize_field("os_error", &io_error.raw_os_error())?;
902    ser.serialize_field("backtrace", &backtrace.to_string())?;
903    ser.end()
904}
905
906fn serialize_storage_error<S>(
907    context: &String,
908    error: &StorageError,
909    backtrace: &Backtrace,
910    serializer: S,
911) -> Result<S::Ok, S::Error>
912where
913    S: Serializer,
914{
915    let mut ser = serializer.serialize_struct("StorageError", 4)?;
916    ser.serialize_field("context", context)?;
917    ser.serialize_field("error", &error.to_string())?;
918    ser.serialize_field("backtrace", &backtrace.to_string())?;
919    ser.end()
920}
921
922fn serialize_encode_error<S>(
923    endpoint: &String,
924    error: &AnyError,
925    serializer: S,
926) -> Result<S::Ok, S::Error>
927where
928    S: Serializer,
929{
930    let mut ser = serializer.serialize_struct("EncodeError", 3)?;
931    ser.serialize_field("endpoint_name", endpoint)?;
932    ser.serialize_field("error", &error.to_string())?;
933    ser.serialize_field("backtrace", &error.backtrace().to_string())?;
934    ser.end()
935}
936
937fn serialize_input_transport_error<S>(
938    endpoint: &String,
939    fatal: &bool,
940    error: &AnyError,
941    serializer: S,
942) -> Result<S::Ok, S::Error>
943where
944    S: Serializer,
945{
946    let mut ser = serializer.serialize_struct("InputTransportError", 4)?;
947    ser.serialize_field("endpoint_name", endpoint)?;
948    ser.serialize_field("fatal", fatal)?;
949    ser.serialize_field("error", &error.to_string())?;
950    ser.serialize_field("backtrace", &error.backtrace().to_string())?;
951    ser.end()
952}
953
954fn serialize_output_transport_error<S>(
955    endpoint: &String,
956    fatal: &bool,
957    error: &AnyError,
958    serializer: S,
959) -> Result<S::Ok, S::Error>
960where
961    S: Serializer,
962{
963    let mut ser = serializer.serialize_struct("OutputTransportError", 4)?;
964    ser.serialize_field("endpoint_name", endpoint)?;
965    ser.serialize_field("fatal", fatal)?;
966    ser.serialize_field("error", &error.to_string())?;
967    ser.serialize_field("backtrace", &error.backtrace().to_string())?;
968    ser.end()
969}
970
971impl DbspDetailedError for ControllerError {
972    // TODO: attempts to cast `AnyError` to `DetailedError`.
973    fn error_code(&self) -> Cow<'static, str> {
974        match self {
975            Self::PreprocessorCreateError { .. } => Cow::from("PreprocessorCreateError"),
976            Self::PostprocessorCreateError { .. } => Cow::from("PostprocessorCreateError"),
977            Self::IoError { .. } => Cow::from("ControllerIoError"),
978            Self::NotSupported { .. } => Cow::from("NotSupported"),
979            Self::SchemaParseError { .. } => Cow::from("SchemaParseError"),
980            Self::SchemaValidationError { .. } => Cow::from("SchemaParseError"),
981            Self::CheckpointParseError { .. } => Cow::from("CheckpointParseError"),
982            Self::CheckpointDoesNotMatchPipeline => Cow::from("CheckpointDoesNotMatchPipeline"),
983            Self::RestoreInProgress => Cow::from("RestoreInProgress"),
984            Self::BootstrapInProgress => Cow::from("BootstrapInProgress"),
985            Self::StepError { .. } => Cow::from("StepError"),
986            Self::UnexpectedStep { .. } => Cow::from("UnexpectedStep"),
987            Self::ReplayFailure { .. } => Cow::from("ReplayFailure"),
988            Self::IrParseError { .. } => Cow::from("IrParseError"),
989            Self::CliArgsError { .. } => Cow::from("ControllerCliArgsError"),
990            Self::Config { config_error } => {
991                Cow::from(format!("ConfigError.{}", config_error.error_code()))
992            }
993            Self::UnknownInputEndpoint { .. } => Cow::from("UnknownInputEndpoint"),
994            Self::UnknownOutputEndpoint { .. } => Cow::from("UnknownOutputEndpoint"),
995            Self::ParseError { .. } => Cow::from("ParseError"),
996            Self::EncodeError { .. } => Cow::from("EncodeError"),
997            Self::InputTransportError { .. } => Cow::from("InputTransportError"),
998            Self::OutputTransportError { .. } => Cow::from("OutputTransportError"),
999            Self::CommandError { .. } => Cow::from("CommandError"),
1000            Self::PrometheusError { .. } => Cow::from("PrometheusError"),
1001            Self::DbspError { error } => error.error_code(),
1002            Self::DbspPanic => Cow::from("DbspPanic"),
1003            Self::ControllerPanic => Cow::from("ControllerPanic"),
1004            Self::ControllerExit => Cow::from("ControllerExit"),
1005            Self::EnterpriseFeature(_) => Cow::from("EnterpriseFeature"),
1006            Self::StorageError { .. } => Cow::from("StorageError"),
1007            Self::SuspendError(_) => Cow::from("SuspendError"),
1008            Self::UnexpectedJsonStructure { .. } => Cow::from("UnexpectedJsonStructure"),
1009            Self::PipelineRestarted { .. } => Cow::from("PipelineRestarted"),
1010            Self::UnknownEndpointInCompletionToken { .. } => {
1011                Cow::from("UnknownEndpointInCompletionToken")
1012            }
1013            Self::CheckpointFetchError { .. } => Cow::from("CheckpointFetchError"),
1014            Self::CheckpointPushError { .. } => Cow::from("CheckpointPushError"),
1015            Self::TransactionInProgress => Cow::from("TransactionInProgress"),
1016            Self::NoTransactionInProgress => Cow::from("NoTransactionInProgress"),
1017            Self::InvalidInitialStatus(_) => Cow::from("InvalidInitialStatus"),
1018            Self::InvalidStandby(_) => Cow::from("InvalidStandby"),
1019            Self::InvalidStartupTransition { .. } => Cow::from("InvalidStartupTransition"),
1020            Self::BootstrapRejectedByUser => Cow::from("BootstrapRejected"),
1021            Self::BootstrapNotAllowed { .. } => Cow::from("BootstrapNotAllowed"),
1022            Self::UnexpectedBootstrap { .. } => Cow::from("UnexpectedBootstrap"),
1023            Self::UnexpectedRuntimeVersion { .. } => Cow::from("UnexpectedRuntimeVersion"),
1024        }
1025    }
1026}
1027
1028impl DetailedError for ControllerError {
1029    // TODO: attempts to cast `AnyError` to `DetailedError`.
1030    fn error_code(&self) -> Cow<'static, str> {
1031        DbspDetailedError::error_code(self)
1032    }
1033}
1034
1035impl StdError for ControllerError {}
1036
1037impl Display for ControllerError {
1038    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
1039        match self {
1040            Self::PreprocessorCreateError {
1041                endpoint_name,
1042                error,
1043            } => {
1044                write!(
1045                    f,
1046                    "Error creating preprocessor for endpoint {endpoint_name}: {error}"
1047                )
1048            }
1049            Self::PostprocessorCreateError {
1050                endpoint_name,
1051                error,
1052            } => {
1053                write!(
1054                    f,
1055                    "Error creating postprocessor for endpoint {endpoint_name}: {error}"
1056                )
1057            }
1058            Self::IoError {
1059                context, io_error, ..
1060            } => {
1061                write!(f, "I/O error {context}: {io_error}")
1062            }
1063            Self::NotSupported { error } => {
1064                write!(f, "Not supported: {error}")
1065            }
1066            Self::SchemaParseError { error } => {
1067                write!(f, "Error parsing program schema: {error}")
1068            }
1069            Self::SchemaValidationError { error } => {
1070                write!(f, "Error validating program schema: {error}")
1071            }
1072            Self::CheckpointParseError { error } => {
1073                write!(f, "Error parsing checkpoint file: {error}")
1074            }
1075            Self::CheckpointDoesNotMatchPipeline => {
1076                write!(
1077                    f,
1078                    "Recovery failed: the pipeline has been recovered from a checkpoint, but the checkpoint does not match the current pipeline definition. This can be caused by a corrupted checkpoint or an internal error."
1079                )
1080            }
1081            Self::RestoreInProgress => {
1082                write!(
1083                    f,
1084                    "Operation cannot be initiated now because the pipeline is restoring from a checkpoint."
1085                )
1086            }
1087            Self::BootstrapInProgress => {
1088                write!(
1089                    f,
1090                    "Operation cannot be initiated while the pipeline is bootstrapping."
1091                )
1092            }
1093            Self::StepError(error) => write!(f, "Error with persistent input steps: {error}"),
1094            Self::UnexpectedStep { actual, expected } => {
1095                write!(f, "Read step {actual}, expected {expected}")
1096            }
1097            Self::ReplayFailure { error } => {
1098                write!(f, "{error}")
1099            }
1100            Self::IrParseError { error } => {
1101                write!(f, "Error parsing program IR: {error}")
1102            }
1103            Self::CliArgsError { error } => {
1104                write!(f, "Error parsing command line arguments: {error}")
1105            }
1106            Self::Config { config_error } => {
1107                write!(f, "invalid controller configuration: {config_error}")
1108            }
1109            Self::UnknownInputEndpoint { endpoint_name } => {
1110                write!(f, "unknown input endpoint name '{endpoint_name}'")
1111            }
1112            Self::UnknownOutputEndpoint { endpoint_name } => {
1113                write!(f, "unknown output endpoint name '{endpoint_name}'")
1114            }
1115            Self::InputTransportError {
1116                endpoint_name,
1117                fatal,
1118                error,
1119            } => {
1120                write!(
1121                    f,
1122                    "{}error on input endpoint '{endpoint_name}': {}",
1123                    if *fatal { "FATAL " } else { "" },
1124                    error.root_cause()
1125                )
1126            }
1127            Self::OutputTransportError {
1128                endpoint_name,
1129                fatal,
1130                error,
1131            } => {
1132                write!(
1133                    f,
1134                    "{}error on output endpoint '{endpoint_name}': {}",
1135                    if *fatal { "FATAL " } else { "" },
1136                    error.root_cause()
1137                )
1138            }
1139            Self::CommandError {
1140                endpoint_name,
1141                error,
1142            } => {
1143                write!(
1144                    f,
1145                    "error executing command on output endpoint '{endpoint_name}': {error}"
1146                )
1147            }
1148            Self::ParseError {
1149                endpoint_name,
1150                error,
1151            } => {
1152                write!(
1153                    f,
1154                    "parse error on input endpoint '{endpoint_name}': {error}"
1155                )
1156            }
1157            Self::EncodeError {
1158                endpoint_name,
1159                error,
1160            } => {
1161                write!(
1162                    f,
1163                    "encoder error on output endpoint '{endpoint_name}': {error}"
1164                )
1165            }
1166            Self::PrometheusError { error } => {
1167                write!(f, "Error in the Prometheus metrics module: '{error}'")
1168            }
1169            Self::DbspError { error } => {
1170                write!(f, "DBSP error: {error}")
1171            }
1172            Self::DbspPanic => {
1173                write!(f, "Panic inside the DBSP runtime")
1174            }
1175            Self::ControllerPanic => {
1176                write!(f, "Panic inside the DBSP controller")
1177            }
1178            Self::ControllerExit => {
1179                write!(f, "Controller exited before command could be executed")
1180            }
1181            Self::EnterpriseFeature(feature) => {
1182                write!(
1183                    f,
1184                    "Cannot use enterprise-only feature ({feature}) in Feldera community edition."
1185                )
1186            }
1187            Self::StorageError { context, error, .. } => {
1188                write!(f, "I/O error {context}: {error}")
1189            }
1190            Self::SuspendError(error) => write!(f, "{error}"),
1191            Self::UnexpectedJsonStructure { reason } => {
1192                write!(f, "An unexpected JSON structure was detected: {reason}")
1193            }
1194            Self::PipelineRestarted { error } => {
1195                write!(f, "{error}")
1196            }
1197            Self::UnknownEndpointInCompletionToken { endpoint_id } => {
1198                write!(
1199                    f,
1200                    "completion token specifies input endpoint id {endpoint_id}, which doesn't exist; this indicates that the input connector was deleted after the completion token was generated"
1201                )
1202            }
1203            Self::CheckpointFetchError { error } => {
1204                write!(f, "Error fetching checkpoint from object store: {error}")
1205            }
1206            Self::CheckpointPushError { error } => {
1207                write!(f, "Error pushing checkpoint to object store: {error}")
1208            }
1209            Self::TransactionInProgress => {
1210                write!(
1211                    f,
1212                    "Cannot perform this operation while there is a transaction in progress"
1213                )
1214            }
1215            Self::NoTransactionInProgress => {
1216                write!(
1217                    f,
1218                    "This operation requires an active transaction, but none is currently in progress"
1219                )
1220            }
1221            Self::InvalidInitialStatus(status) => {
1222                write!(
1223                    f,
1224                    "Invalid initial status {status:?} provided on command line or read from storage (only coordination, running, paused, and standby are valid)"
1225                )
1226            }
1227            Self::InvalidStandby(standby) => {
1228                write!(f, "Cannot enter standby mode: {standby}")
1229            }
1230            Self::InvalidStartupTransition { from, to } => {
1231                write!(
1232                    f,
1233                    "Invalid pipeline startup transition from {from:?} to {to:?}"
1234                )
1235            }
1236            Self::BootstrapRejectedByUser => {
1237                write!(
1238                    f,
1239                    "Bootstrapping of the modified pipeline was rejected by the user."
1240                )
1241            }
1242            Self::BootstrapNotAllowed { error } => {
1243                write!(
1244                    f,
1245                    "The pipeline has been modified since the last checkpoint; however it cannot be bootstrapped due to the following reasons:\n{error}"
1246                )
1247            }
1248            Self::UnexpectedBootstrap { bootstrap_info } => {
1249                write!(
1250                    f,
1251                    "The pipeline's query plan was modified since the last checkpoint and requires bootstrapping; however we were not able to detect the changes in the pipeline metadata. This is a bug; please report to the developers. Bootstrap info: {bootstrap_info:?}"
1252                )
1253            }
1254            Self::UnexpectedRuntimeVersion { error } => {
1255                write!(f, "{error}")
1256            }
1257        }
1258    }
1259}
1260
1261impl ControllerError {
1262    pub fn io_error(context: impl Display, io_error: IoError) -> Self {
1263        Self::IoError {
1264            context: context.to_string(),
1265            io_error,
1266            backtrace: Backtrace::capture(),
1267        }
1268    }
1269
1270    pub fn not_supported(error: &str) -> Self {
1271        Self::NotSupported {
1272            error: error.to_string(),
1273        }
1274    }
1275
1276    pub fn schema_parse_error(error: &str) -> Self {
1277        Self::SchemaParseError {
1278            error: error.to_string(),
1279        }
1280    }
1281
1282    pub fn checkpoint_does_not_match_pipeline() -> Self {
1283        Self::CheckpointDoesNotMatchPipeline
1284    }
1285
1286    pub fn checkpoint_fetch_error(error: String) -> Self {
1287        Self::CheckpointFetchError { error }
1288    }
1289
1290    pub fn checkpoint_push_error(error: String) -> Self {
1291        Self::CheckpointPushError { error }
1292    }
1293
1294    pub fn schema_validation_error(error: &str) -> Self {
1295        Self::SchemaValidationError {
1296            error: error.to_string(),
1297        }
1298    }
1299
1300    pub fn ir_parse_error(error: &str) -> Self {
1301        Self::IrParseError {
1302            error: error.to_string(),
1303        }
1304    }
1305
1306    pub fn cli_args_error<E>(error: &E) -> Self
1307    where
1308        E: ToString,
1309    {
1310        Self::CliArgsError {
1311            error: error.to_string(),
1312        }
1313    }
1314
1315    pub fn unknown_input_endpoint(endpoint_name: &str) -> Self {
1316        Self::UnknownInputEndpoint {
1317            endpoint_name: endpoint_name.to_string(),
1318        }
1319    }
1320
1321    pub fn unknown_output_endpoint(endpoint_name: &str) -> Self {
1322        Self::UnknownOutputEndpoint {
1323            endpoint_name: endpoint_name.to_string(),
1324        }
1325    }
1326
1327    pub fn pipeline_config_parse_error<E>(error: &E) -> Self
1328    where
1329        E: ToString,
1330    {
1331        Self::Config {
1332            config_error: Box::new(ConfigError::pipeline_config_parse_error(error)),
1333        }
1334    }
1335
1336    pub fn parser_config_parse_error<E>(endpoint_name: &str, error: &E, config: &str) -> Self
1337    where
1338        E: ToString,
1339    {
1340        Self::Config {
1341            config_error: Box::new(ConfigError::parser_config_parse_error(
1342                endpoint_name,
1343                error,
1344                config,
1345            )),
1346        }
1347    }
1348
1349    pub fn encoder_config_parse_error<E>(endpoint_name: &str, error: &E, config: &str) -> Self
1350    where
1351        E: ToString,
1352    {
1353        Self::Config {
1354            config_error: Box::new(ConfigError::encoder_config_parse_error(
1355                endpoint_name,
1356                error,
1357                config,
1358            )),
1359        }
1360    }
1361
1362    pub fn duplicate_input_endpoint(endpoint_name: &str) -> Self {
1363        Self::Config {
1364            config_error: Box::new(ConfigError::duplicate_input_endpoint(endpoint_name)),
1365        }
1366    }
1367
1368    pub fn duplicate_input_stream(stream_name: &str) -> Self {
1369        Self::Config {
1370            config_error: Box::new(ConfigError::duplicate_input_stream(stream_name)),
1371        }
1372    }
1373
1374    pub fn unknown_input_format(endpoint_name: &str, format_name: &str) -> Self {
1375        Self::Config {
1376            config_error: Box::new(ConfigError::unknown_input_format(
1377                endpoint_name,
1378                format_name,
1379            )),
1380        }
1381    }
1382
1383    pub fn unknown_input_transport(endpoint_name: &str, transport_name: &str) -> Self {
1384        Self::Config {
1385            config_error: Box::new(ConfigError::unknown_input_transport(
1386                endpoint_name,
1387                transport_name,
1388            )),
1389        }
1390    }
1391
1392    pub fn duplicate_output_endpoint(endpoint_name: &str) -> Self {
1393        Self::Config {
1394            config_error: Box::new(ConfigError::duplicate_output_endpoint(endpoint_name)),
1395        }
1396    }
1397
1398    pub fn duplicate_output_stream(stream_name: &str) -> Self {
1399        Self::Config {
1400            config_error: Box::new(ConfigError::duplicate_output_stream(stream_name)),
1401        }
1402    }
1403
1404    pub fn unknown_output_format(endpoint_name: &str, format_name: &str) -> Self {
1405        Self::Config {
1406            config_error: Box::new(ConfigError::unknown_output_format(
1407                endpoint_name,
1408                format_name,
1409            )),
1410        }
1411    }
1412
1413    pub fn unknown_output_transport(endpoint_name: &str, transport_name: &str) -> Self {
1414        Self::Config {
1415            config_error: Box::new(ConfigError::unknown_output_transport(
1416                endpoint_name,
1417                transport_name,
1418            )),
1419        }
1420    }
1421
1422    pub fn unknown_input_stream(endpoint_name: &str, stream_name: &str) -> Self {
1423        Self::Config {
1424            config_error: Box::new(ConfigError::unknown_input_stream(
1425                endpoint_name,
1426                stream_name,
1427            )),
1428        }
1429    }
1430
1431    pub fn unknown_output_stream(endpoint_name: &str, stream_name: &str) -> Self {
1432        Self::Config {
1433            config_error: Box::new(ConfigError::unknown_output_stream(
1434                endpoint_name,
1435                stream_name,
1436            )),
1437        }
1438    }
1439
1440    pub fn unknown_index(endpoint_name: &str, index_name: &str) -> Self {
1441        Self::Config {
1442            config_error: Box::new(ConfigError::unknown_index(endpoint_name, index_name)),
1443        }
1444    }
1445
1446    pub fn not_an_index(endpoint_name: &str, index_name: &str) -> Self {
1447        Self::Config {
1448            config_error: Box::new(ConfigError::not_an_index(endpoint_name, index_name)),
1449        }
1450    }
1451
1452    pub fn input_format_not_supported(endpoint_name: &str, error: &str) -> Self {
1453        Self::Config {
1454            config_error: Box::new(ConfigError::input_format_not_supported(
1455                endpoint_name,
1456                error,
1457            )),
1458        }
1459    }
1460
1461    pub fn output_format_not_supported(endpoint_name: &str, error: &str) -> Self {
1462        Self::Config {
1463            config_error: Box::new(ConfigError::output_format_not_supported(
1464                endpoint_name,
1465                error,
1466            )),
1467        }
1468    }
1469
1470    pub fn input_format_not_specified(endpoint_name: &str) -> Self {
1471        Self::Config {
1472            config_error: Box::new(ConfigError::input_format_not_specified(endpoint_name)),
1473        }
1474    }
1475
1476    pub fn output_format_not_specified(endpoint_name: &str) -> Self {
1477        Self::Config {
1478            config_error: Box::new(ConfigError::output_format_not_specified(endpoint_name)),
1479        }
1480    }
1481
1482    pub fn invalid_encoder_configuration(endpoint_name: &str, error: &str) -> Self {
1483        Self::Config {
1484            config_error: Box::new(ConfigError::invalid_encoder_configuration(
1485                endpoint_name,
1486                error,
1487            )),
1488        }
1489    }
1490
1491    pub fn invalid_parser_configuration(endpoint_name: &str, error: &str) -> Self {
1492        Self::Config {
1493            config_error: Box::new(ConfigError::invalid_parser_configuration(
1494                endpoint_name,
1495                error,
1496            )),
1497        }
1498    }
1499
1500    pub fn invalid_transport_configuration(endpoint_name: &str, error: &str) -> Self {
1501        Self::Config {
1502            config_error: Box::new(ConfigError::invalid_transport_configuration(
1503                endpoint_name,
1504                error,
1505            )),
1506        }
1507    }
1508
1509    pub fn invalid_output_buffer_configuration(endpoint_name: &str, error: &str) -> Self {
1510        Self::Config {
1511            config_error: Box::new(ConfigError::invalid_output_buffer_configuration(
1512                endpoint_name,
1513                error,
1514            )),
1515        }
1516    }
1517
1518    pub fn input_transport_error(endpoint_name: &str, fatal: bool, error: AnyError) -> Self {
1519        Self::InputTransportError {
1520            endpoint_name: endpoint_name.to_owned(),
1521            fatal,
1522            error,
1523        }
1524    }
1525
1526    pub fn output_transport_error(endpoint_name: &str, fatal: bool, error: AnyError) -> Self {
1527        Self::OutputTransportError {
1528            endpoint_name: endpoint_name.to_owned(),
1529            fatal,
1530            error,
1531        }
1532    }
1533
1534    pub fn command_error(endpoint_name: &str, error: &str) -> Self {
1535        Self::CommandError {
1536            endpoint_name: endpoint_name.to_owned(),
1537            error: error.to_string(),
1538        }
1539    }
1540
1541    pub fn parse_error(endpoint_name: &str, error: ParseError) -> Self {
1542        Self::ParseError {
1543            endpoint_name: endpoint_name.to_owned(),
1544            error: Box::new(error),
1545        }
1546    }
1547
1548    pub fn encode_error(endpoint_name: &str, error: AnyError) -> Self {
1549        Self::EncodeError {
1550            endpoint_name: endpoint_name.to_owned(),
1551            error,
1552        }
1553    }
1554
1555    pub fn prometheus_error<E>(error: &E) -> Self
1556    where
1557        E: ToString,
1558    {
1559        Self::PrometheusError {
1560            error: error.to_string(),
1561        }
1562    }
1563
1564    pub fn dbsp_error(error: DbspError) -> Self {
1565        Self::DbspError { error }
1566    }
1567
1568    pub fn dbsp_panic() -> Self {
1569        Self::DbspPanic
1570    }
1571
1572    pub fn controller_panic() -> Self {
1573        Self::ControllerPanic
1574    }
1575
1576    pub fn storage_error(context: impl Display, error: StorageError) -> Self {
1577        Self::StorageError {
1578            context: context.to_string(),
1579            error,
1580            backtrace: Box::new(Backtrace::capture()),
1581        }
1582    }
1583
1584    pub fn pipeline_restarted(error: &str) -> Self {
1585        Self::PipelineRestarted {
1586            error: error.to_string(),
1587        }
1588    }
1589
1590    pub fn unknown_endpoint_in_completion_token(endpoint_id: u64) -> Self {
1591        Self::UnknownEndpointInCompletionToken { endpoint_id }
1592    }
1593
1594    pub fn kind(&self) -> ErrorKind {
1595        match self {
1596            Self::IoError { io_error, .. } => io_error.kind(),
1597            Self::StorageError { error, .. } => error.kind(),
1598            Self::StepError(error) => error.kind(),
1599            Self::RestoreInProgress
1600            | Self::BootstrapInProgress
1601            | Self::SuspendError(SuspendError::Temporary(_))
1602            | Self::TransactionInProgress => ErrorKind::ResourceBusy,
1603            Self::NotSupported { .. } | Self::SuspendError(SuspendError::Permanent(_)) => {
1604                ErrorKind::Unsupported
1605            }
1606            Self::DbspError {
1607                error: DbspError::IO(error),
1608            } => error.kind(),
1609            Self::DbspError { .. }
1610            | Self::SchemaParseError { .. }
1611            | Self::SchemaValidationError { .. }
1612            | Self::CheckpointParseError { .. }
1613            | Self::CheckpointDoesNotMatchPipeline
1614            | Self::UnexpectedStep { .. }
1615            | Self::ReplayFailure { .. }
1616            | Self::IrParseError { .. }
1617            | Self::CliArgsError { .. }
1618            | Self::Config { .. }
1619            | Self::UnknownInputEndpoint { .. }
1620            | Self::UnknownOutputEndpoint { .. }
1621            | Self::ParseError { .. }
1622            | Self::EncodeError { .. }
1623            | Self::InputTransportError { .. }
1624            | Self::OutputTransportError { .. }
1625            | Self::CommandError { .. }
1626            | Self::PrometheusError { .. }
1627            | Self::DbspPanic
1628            | Self::ControllerPanic
1629            | Self::ControllerExit
1630            | Self::EnterpriseFeature(_)
1631            | Self::UnexpectedJsonStructure { .. }
1632            | Self::UnknownEndpointInCompletionToken { .. }
1633            | Self::CheckpointFetchError { .. }
1634            | Self::CheckpointPushError { .. }
1635            | Self::PipelineRestarted { .. }
1636            | Self::NoTransactionInProgress
1637            | Self::InvalidInitialStatus(_)
1638            | Self::BootstrapRejectedByUser
1639            | Self::BootstrapNotAllowed { .. }
1640            | Self::UnexpectedBootstrap { .. }
1641            | Self::InvalidStandby(_)
1642            | Self::InvalidStartupTransition { .. }
1643            | Self::PreprocessorCreateError { .. }
1644            | Self::PostprocessorCreateError { .. }
1645            | Self::UnexpectedRuntimeVersion { .. } => ErrorKind::Other,
1646        }
1647    }
1648}
1649
1650impl From<ConfigError> for ControllerError {
1651    fn from(config_error: ConfigError) -> Self {
1652        Self::Config {
1653            config_error: Box::new(config_error),
1654        }
1655    }
1656}
1657
1658impl From<DbspError> for ControllerError {
1659    fn from(error: DbspError) -> Self {
1660        Self::DbspError { error }
1661    }
1662}
1663
1664impl From<SuspendError> for ControllerError {
1665    fn from(error: SuspendError) -> Self {
1666        Self::SuspendError(error)
1667    }
1668}