Skip to main content

feldera_adapterlib/errors/
controller.rs

1use std::{
2    backtrace::Backtrace,
3    borrow::Cow,
4    error::Error as StdError,
5    fmt::{Display, Error as FmtError, Formatter},
6    io::{Error as IoError, ErrorKind},
7    string::ToString,
8};
9
10use actix_web::body::BoxBody;
11use actix_web::http::StatusCode;
12use actix_web::{HttpResponse, HttpResponseBuilder, ResponseError};
13use anyhow::Error as AnyError;
14use dbsp::{
15    Error as DbspError,
16    circuit::{LayoutError, circuit_builder::BootstrapInfo},
17    storage::backend::StorageError,
18};
19use feldera_types::{
20    error::{DetailedError, ErrorResponse},
21    runtime_status::RuntimeDesiredStatus,
22    suspend::SuspendError,
23};
24use serde::{Serialize, Serializer, ser::SerializeStruct};
25
26use super::journal::StepError;
27use crate::{DbspDetailedError, format::ParseError, transport::Step};
28
29/// Controller configuration error.
30#[derive(Debug, Serialize)]
31#[serde(untagged)]
32pub enum ConfigError {
33    /// Failed to parse pipeline configuration.
34    PipelineConfigParseError {
35        error: String,
36    },
37
38    /// Failed to parse parser configuration for an endpoint.
39    ParserConfigParseError {
40        endpoint_name: String,
41        error: String,
42        config: String,
43    },
44
45    /// Failed to parse encoder configuration for an endpoint.
46    EncoderConfigParseError {
47        endpoint_name: String,
48        error: String,
49        config: String,
50    },
51
52    /// Input endpoint with this name already exists.
53    DuplicateInputEndpoint {
54        endpoint_name: String,
55    },
56
57    /// Input table with this name already exists.
58    DuplicateInputStream {
59        stream_name: String,
60    },
61
62    /// Output endpoint with this name already exists.
63    DuplicateOutputEndpoint {
64        endpoint_name: String,
65    },
66
67    /// Output view with this name already exists.
68    DuplicateOutputStream {
69        stream_name: String,
70    },
71
72    /// Endpoint configuration specifies unknown input format name.
73    UnknownInputFormat {
74        endpoint_name: String,
75        format_name: String,
76    },
77
78    /// Endpoint configuration specifies unknown output format name.
79    UnknownOutputFormat {
80        endpoint_name: String,
81        format_name: String,
82    },
83
84    /// Endpoint configuration specifies unknown input transport name.
85    UnknownInputTransport {
86        endpoint_name: String,
87        transport_name: String,
88    },
89
90    /// Endpoint configuration specifies unknown output transport name.
91    UnknownOutputTransport {
92        endpoint_name: String,
93        transport_name: String,
94    },
95
96    /// Endpoint configuration specifies an input stream name
97    /// that is not found in the circuit catalog.
98    UnknownInputStream {
99        endpoint_name: String,
100        stream_name: String,
101    },
102
103    /// Endpoint configuration specifies an output stream name
104    /// that is not found in the circuit catalog.
105    UnknownOutputStream {
106        endpoint_name: String,
107        stream_name: String,
108    },
109
110    /// Endpoint configuration specifies an index name
111    /// that is not found in the circuit catalog.
112    UnknownIndex {
113        endpoint_name: String,
114        index_name: String,
115    },
116
117    NotAnIndex {
118        endpoint_name: String,
119        index_name: String,
120    },
121
122    InputFormatNotSupported {
123        endpoint_name: String,
124        error: String,
125    },
126
127    OutputFormatNotSupported {
128        endpoint_name: String,
129        error: String,
130    },
131
132    InputFormatNotSpecified {
133        endpoint_name: String,
134    },
135
136    OutputFormatNotSpecified {
137        endpoint_name: String,
138    },
139
140    InvalidEncoderConfig {
141        endpoint_name: String,
142        error: String,
143    },
144
145    InvalidParserConfig {
146        endpoint_name: String,
147        error: String,
148    },
149
150    InvalidTransportConfig {
151        endpoint_name: String,
152        error: String,
153    },
154
155    InvalidOutputBufferConfig {
156        endpoint_name: String,
157        error: String,
158    },
159
160    CyclicDependency {
161        cycle: Vec<(String, String)>,
162    },
163
164    EmptyStartAfter {
165        endpoint_name: String,
166    },
167
168    FtRequiresStorage,
169    FtRequiresFtInput,
170
171    InvalidLayout(LayoutError),
172}
173
174impl StdError for ConfigError {}
175
176impl DbspDetailedError for ConfigError {
177    fn error_code(&self) -> Cow<'static, str> {
178        match self {
179            Self::PipelineConfigParseError { .. } => Cow::from("PipelineConfigParseError"),
180            Self::ParserConfigParseError { .. } => Cow::from("ParserConfigParseError"),
181            Self::EncoderConfigParseError { .. } => Cow::from("EncoderConfigParseError"),
182            Self::DuplicateInputEndpoint { .. } => Cow::from("DuplicateInputEndpoint"),
183            Self::DuplicateInputStream { .. } => Cow::from("DuplicateInputStream"),
184            Self::DuplicateOutputEndpoint { .. } => Cow::from("DuplicateOutputEndpoint"),
185            Self::DuplicateOutputStream { .. } => Cow::from("DuplicateOutputStream"),
186            Self::UnknownInputFormat { .. } => Cow::from("UnknownInputFormat"),
187            Self::UnknownOutputFormat { .. } => Cow::from("UnknownOutputFormat"),
188            Self::UnknownInputTransport { .. } => Cow::from("UnknownInputTransport"),
189            Self::UnknownOutputTransport { .. } => Cow::from("UnknownOutputTransport"),
190            Self::UnknownInputStream { .. } => Cow::from("UnknownInputStream"),
191            Self::UnknownOutputStream { .. } => Cow::from("UnknownOutputStream"),
192            Self::UnknownIndex { .. } => Cow::from("UnknownIndex"),
193            Self::NotAnIndex { .. } => Cow::from("NotAnIndex"),
194            Self::InputFormatNotSupported { .. } => Cow::from("InputFormatNotSupported"),
195            Self::OutputFormatNotSupported { .. } => Cow::from("OutputFormatNotSupported"),
196            Self::InputFormatNotSpecified { .. } => Cow::from("InputFormatNotSpecified"),
197            Self::OutputFormatNotSpecified { .. } => Cow::from("OutputFormatNotSpecified"),
198            Self::InvalidEncoderConfig { .. } => Cow::from("InvalidEncoderConfig"),
199            Self::InvalidParserConfig { .. } => Cow::from("InvalidParserConfig"),
200            Self::InvalidTransportConfig { .. } => Cow::from("InvalidTransportConfig"),
201            Self::InvalidOutputBufferConfig { .. } => Cow::from("InvalidOutputBufferConfig"),
202            Self::FtRequiresStorage => Cow::from("FtRequiresStorage"),
203            Self::FtRequiresFtInput => Cow::from("FtWithNonFtInput"),
204            Self::CyclicDependency { .. } => Cow::from("CyclicDependency"),
205            Self::EmptyStartAfter { .. } => Cow::from("EmptyStartAfter"),
206            Self::InvalidLayout(_) => Cow::from("LayoutError"),
207        }
208    }
209}
210
211impl Display for ConfigError {
212    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
213        match self {
214            Self::PipelineConfigParseError { error } => {
215                write!(f, "Failed to parse pipeline configuration: {error}")
216            }
217            Self::ParserConfigParseError {
218                endpoint_name,
219                error,
220                config,
221            } => {
222                write!(
223                    f,
224                    "Error parsing format configuration for input endpoint '{endpoint_name}': {error}\nInvalid configuration: {config}"
225                )
226            }
227            Self::EncoderConfigParseError {
228                endpoint_name,
229                error,
230                config,
231            } => {
232                write!(
233                    f,
234                    "Error parsing format configuration for output endpoint '{endpoint_name}': {error}\nInvalid configuration: {config}"
235                )
236            }
237            Self::DuplicateInputEndpoint { endpoint_name } => {
238                write!(f, "Input endpoint '{endpoint_name}' already exists")
239            }
240            Self::DuplicateInputStream { stream_name } => {
241                write!(f, "Duplicate table name '{stream_name}'")
242            }
243            Self::UnknownInputFormat {
244                endpoint_name,
245                format_name,
246            } => {
247                write!(
248                    f,
249                    "Input endpoint '{endpoint_name}' specifies unknown input format '{format_name}'"
250                )
251            }
252            Self::UnknownInputTransport {
253                endpoint_name,
254                transport_name,
255            } => {
256                write!(
257                    f,
258                    "Input endpoint '{endpoint_name}' specifies unknown input transport '{transport_name}'"
259                )
260            }
261            Self::DuplicateOutputEndpoint { endpoint_name } => {
262                write!(f, "Output endpoint '{endpoint_name}' already exists")
263            }
264            Self::DuplicateOutputStream { stream_name } => {
265                write!(f, "Duplicate table or view name '{stream_name}'")
266            }
267            Self::UnknownOutputFormat {
268                endpoint_name,
269                format_name,
270            } => {
271                write!(
272                    f,
273                    "Output endpoint '{endpoint_name}' specifies unknown output format '{format_name}'"
274                )
275            }
276            Self::UnknownOutputTransport {
277                endpoint_name,
278                transport_name,
279            } => {
280                write!(
281                    f,
282                    "Output endpoint '{endpoint_name}' specifies unknown output transport '{transport_name}'"
283                )
284            }
285            Self::UnknownInputStream {
286                endpoint_name,
287                stream_name,
288            } => {
289                write!(
290                    f,
291                    "Input endpoint '{endpoint_name}' specifies unknown table '{stream_name}'"
292                )
293            }
294            Self::UnknownOutputStream {
295                endpoint_name,
296                stream_name,
297            } => {
298                write!(
299                    f,
300                    "Output endpoint '{endpoint_name}' specifies unknown output table or view '{stream_name}'"
301                )
302            }
303            Self::UnknownIndex {
304                endpoint_name,
305                index_name,
306            } => {
307                write!(
308                    f,
309                    "Output endpoint '{endpoint_name}' specifies index name '{index_name}'; however, the '{index_name}' relation is not an index"
310                )
311            }
312
313            Self::NotAnIndex {
314                endpoint_name,
315                index_name,
316            } => {
317                write!(
318                    f,
319                    "Output endpoint '{endpoint_name}' specifies unknown index '{index_name}'"
320                )
321            }
322
323            Self::InputFormatNotSupported {
324                endpoint_name,
325                error,
326            } => {
327                write!(
328                    f,
329                    "Format not supported on input endpoint '{endpoint_name}': {error}"
330                )
331            }
332            Self::OutputFormatNotSupported {
333                endpoint_name,
334                error,
335            } => {
336                write!(
337                    f,
338                    "Format not supported on output endpoint '{endpoint_name}': {error}"
339                )
340            }
341            Self::InputFormatNotSpecified { endpoint_name } => {
342                write!(
343                    f,
344                    "Data format is not specified for input endpoint '{endpoint_name}' (set the 'format' field inside connector configuration)"
345                )
346            }
347            Self::OutputFormatNotSpecified { endpoint_name } => {
348                write!(
349                    f,
350                    "Data format is not specified for output endpoint '{endpoint_name}' (set the 'format' field inside connector configuration)"
351                )
352            }
353            Self::InvalidEncoderConfig {
354                endpoint_name,
355                error,
356            } => {
357                write!(
358                    f,
359                    "invalid format configuration for output endpoint '{endpoint_name}': {error}"
360                )
361            }
362            Self::InvalidParserConfig {
363                endpoint_name,
364                error,
365            } => {
366                write!(
367                    f,
368                    "invalid format configuration for input endpoint '{endpoint_name}': {error}"
369                )
370            }
371            Self::InvalidTransportConfig {
372                endpoint_name,
373                error,
374            } => {
375                write!(
376                    f,
377                    "invalid transport configuration for endpoint '{endpoint_name}': {error}"
378                )
379            }
380            Self::InvalidOutputBufferConfig {
381                endpoint_name,
382                error,
383            } => {
384                write!(
385                    f,
386                    "invalid output buffer configuration for endpoint '{endpoint_name}': {error}"
387                )
388            }
389            Self::CyclicDependency { cycle } => {
390                let mut cycle = cycle.clone();
391                cycle.push(cycle[0].clone());
392                let tail = cycle[1..]
393                    .iter()
394                    .map(|(endpoint, label)| {
395                        format!("waits for endpoint '{endpoint}' with label '{label}'")
396                    })
397                    .collect::<Vec<_>>()
398                    .join(", which ");
399                write!(
400                    f,
401                    "cyclic 'start_after' dependency detected: endpoint '{}' with label '{}' {}",
402                    cycle[0].0, cycle[0].1, tail
403                )
404            }
405            Self::EmptyStartAfter { endpoint_name } => {
406                write!(
407                    f,
408                    "empty 'start_after' field for input endpoint '{}'",
409                    endpoint_name
410                )
411            }
412            Self::FtRequiresStorage => write!(
413                f,
414                "Fault tolerance is configured, which requires storage, but storage is not enabled"
415            ),
416            Self::FtRequiresFtInput => write!(
417                f,
418                "Fault tolerance is configured, but it cannot be enabled because the pipeline has at least one non-fault-tolerant input adapter"
419            ),
420            Self::InvalidLayout(e) => write!(f, "Multihost layout error: {e}"),
421        }
422    }
423}
424
425impl ConfigError {
426    pub fn pipeline_config_parse_error<E>(error: &E) -> Self
427    where
428        E: ToString,
429    {
430        Self::PipelineConfigParseError {
431            error: error.to_string(),
432        }
433    }
434
435    pub fn parser_config_parse_error<E>(endpoint_name: &str, error: &E, config: &str) -> Self
436    where
437        E: ToString,
438    {
439        Self::ParserConfigParseError {
440            endpoint_name: endpoint_name.to_owned(),
441            error: error.to_string(),
442            config: config.to_string(),
443        }
444    }
445
446    pub fn encoder_config_parse_error<E>(endpoint_name: &str, error: &E, config: &str) -> Self
447    where
448        E: ToString,
449    {
450        Self::EncoderConfigParseError {
451            endpoint_name: endpoint_name.to_owned(),
452            error: error.to_string(),
453            config: config.to_string(),
454        }
455    }
456
457    pub fn duplicate_input_endpoint(endpoint_name: &str) -> Self {
458        Self::DuplicateInputEndpoint {
459            endpoint_name: endpoint_name.to_owned(),
460        }
461    }
462
463    pub fn duplicate_input_stream(stream_name: &str) -> Self {
464        Self::DuplicateInputStream {
465            stream_name: stream_name.to_owned(),
466        }
467    }
468
469    pub fn unknown_input_format(endpoint_name: &str, format_name: &str) -> Self {
470        Self::UnknownInputFormat {
471            endpoint_name: endpoint_name.to_owned(),
472            format_name: format_name.to_owned(),
473        }
474    }
475
476    pub fn unknown_input_transport(endpoint_name: &str, transport_name: &str) -> Self {
477        Self::UnknownInputTransport {
478            endpoint_name: endpoint_name.to_owned(),
479            transport_name: transport_name.to_owned(),
480        }
481    }
482
483    pub fn duplicate_output_endpoint(endpoint_name: &str) -> Self {
484        Self::DuplicateOutputEndpoint {
485            endpoint_name: endpoint_name.to_owned(),
486        }
487    }
488
489    pub fn duplicate_output_stream(stream_name: &str) -> Self {
490        Self::DuplicateOutputStream {
491            stream_name: stream_name.to_owned(),
492        }
493    }
494
495    pub fn unknown_output_format(endpoint_name: &str, format_name: &str) -> Self {
496        Self::UnknownOutputFormat {
497            endpoint_name: endpoint_name.to_owned(),
498            format_name: format_name.to_owned(),
499        }
500    }
501
502    pub fn unknown_output_transport(endpoint_name: &str, transport_name: &str) -> Self {
503        Self::UnknownOutputTransport {
504            endpoint_name: endpoint_name.to_owned(),
505            transport_name: transport_name.to_owned(),
506        }
507    }
508
509    pub fn unknown_input_stream(endpoint_name: &str, stream_name: &str) -> Self {
510        Self::UnknownInputStream {
511            endpoint_name: endpoint_name.to_owned(),
512            stream_name: stream_name.to_owned(),
513        }
514    }
515
516    pub fn unknown_output_stream(endpoint_name: &str, stream_name: &str) -> Self {
517        Self::UnknownOutputStream {
518            endpoint_name: endpoint_name.to_owned(),
519            stream_name: stream_name.to_owned(),
520        }
521    }
522
523    pub fn unknown_index(endpoint_name: &str, index_name: &str) -> Self {
524        Self::UnknownIndex {
525            endpoint_name: endpoint_name.to_owned(),
526            index_name: index_name.to_owned(),
527        }
528    }
529
530    pub fn not_an_index(endpoint_name: &str, index_name: &str) -> Self {
531        Self::NotAnIndex {
532            endpoint_name: endpoint_name.to_owned(),
533            index_name: index_name.to_owned(),
534        }
535    }
536
537    pub fn input_format_not_supported(endpoint_name: &str, error: &str) -> Self {
538        Self::InputFormatNotSupported {
539            endpoint_name: endpoint_name.to_owned(),
540            error: error.to_owned(),
541        }
542    }
543
544    pub fn output_format_not_supported(endpoint_name: &str, error: &str) -> Self {
545        Self::OutputFormatNotSupported {
546            endpoint_name: endpoint_name.to_owned(),
547            error: error.to_owned(),
548        }
549    }
550
551    pub fn input_format_not_specified(endpoint_name: &str) -> Self {
552        Self::InputFormatNotSpecified {
553            endpoint_name: endpoint_name.to_owned(),
554        }
555    }
556
557    pub fn output_format_not_specified(endpoint_name: &str) -> Self {
558        Self::OutputFormatNotSpecified {
559            endpoint_name: endpoint_name.to_owned(),
560        }
561    }
562
563    pub fn invalid_encoder_configuration(endpoint_name: &str, error: &str) -> Self {
564        Self::InvalidEncoderConfig {
565            endpoint_name: endpoint_name.to_string(),
566            error: error.to_string(),
567        }
568    }
569
570    pub fn invalid_parser_configuration(endpoint_name: &str, error: &str) -> Self {
571        Self::InvalidParserConfig {
572            endpoint_name: endpoint_name.to_string(),
573            error: error.to_string(),
574        }
575    }
576
577    pub fn invalid_transport_configuration(endpoint_name: &str, error: &str) -> Self {
578        Self::InvalidTransportConfig {
579            endpoint_name: endpoint_name.to_string(),
580            error: error.to_string(),
581        }
582    }
583
584    pub fn invalid_output_buffer_configuration(endpoint_name: &str, error: &str) -> Self {
585        Self::InvalidOutputBufferConfig {
586            endpoint_name: endpoint_name.to_string(),
587            error: error.to_string(),
588        }
589    }
590
591    pub fn cyclic_dependency(cycle: Vec<(String, String)>) -> Self {
592        Self::CyclicDependency { cycle }
593    }
594
595    pub fn empty_start_after(endpoint_name: &str) -> Self {
596        Self::EmptyStartAfter {
597            endpoint_name: endpoint_name.to_string(),
598        }
599    }
600}
601
602/// Controller error.
603///
604/// Reports all errors that arise from operating a streaming pipeline consisting
605/// of input adapters, output adapters, and a DBSP circuit, via the controller
606/// API.
607#[derive(Debug, Serialize)]
608#[serde(untagged)]
609pub enum ControllerError {
610    /// I/O error.
611    #[serde(serialize_with = "serialize_io_error")]
612    IoError {
613        /// Describes the context where the error occurred.
614        context: String,
615        io_error: IoError,
616        backtrace: Backtrace,
617    },
618
619    /// Error parsing program schema.
620    SchemaParseError {
621        error: String,
622    },
623
624    /// Error validating program schema.
625    SchemaValidationError {
626        error: String,
627    },
628
629    /// Error parsing the checkpoint.
630    CheckpointParseError {
631        error: String,
632    },
633
634    CheckpointDoesNotMatchPipeline,
635
636    /// Operation cannot be initiated now because the pipeline is being restored
637    /// from a checkpoint.
638    RestoreInProgress,
639
640    /// Operation cannot be initiated now because the pipeline is bootstrapping new/modified views.
641    BootstrapInProgress,
642
643    /// Error in journal metadata.
644    StepError(StepError),
645
646    /// Unexpected step number.
647    UnexpectedStep {
648        actual: Step,
649        expected: Step,
650    },
651
652    /// Step replay failure.
653    ReplayFailure {
654        error: String,
655    },
656
657    /// Feature is not supported.
658    NotSupported {
659        error: String,
660    },
661
662    /// Error parsing program IR file.
663    IrParseError {
664        error: String,
665    },
666
667    /// Error parsing CLI arguments.
668    CliArgsError {
669        error: String,
670    },
671
672    /// Invalid controller configuration.
673    Config {
674        config_error: Box<ConfigError>,
675    },
676
677    /// Unknown input endpoint name.
678    UnknownInputEndpoint {
679        endpoint_name: String,
680    },
681
682    /// Error creating a user-defined preprocessor
683    PreprocessorCreateError {
684        endpoint_name: String,
685        error: String,
686    },
687
688    /// Unknown output endpoint name.
689    UnknownOutputEndpoint {
690        endpoint_name: String,
691    },
692
693    /// Error parsing input data.
694    ///
695    /// Parser errors are expected to be
696    /// recoverable, i.e., the parser should be able to successfully parse
697    /// new valid inputs after an error.
698    ParseError {
699        endpoint_name: String,
700        error: Box<ParseError>,
701    },
702
703    /// Encode error.
704    ///
705    /// Error encoding the last output batch.  Encoder errors are expected to
706    /// be recoverable, i.e., the encoder should be able to successfully parse
707    /// new valid inputs after an error.
708    #[serde(serialize_with = "serialize_encode_error")]
709    EncodeError {
710        endpoint_name: String,
711        error: AnyError,
712    },
713
714    /// Input transport endpoint error.
715    #[serde(serialize_with = "serialize_input_transport_error")]
716    InputTransportError {
717        endpoint_name: String,
718        fatal: bool,
719        error: AnyError,
720    },
721
722    /// Output transport endpoint error.
723    #[serde(serialize_with = "serialize_output_transport_error")]
724    OutputTransportError {
725        endpoint_name: String,
726        fatal: bool,
727        error: AnyError,
728    },
729
730    CommandError {
731        endpoint_name: String,
732        error: String,
733    },
734
735    /// Error evaluating the DBSP circuit.
736    DbspError {
737        error: DbspError,
738    },
739
740    /// Error inside the Prometheus module.
741    PrometheusError {
742        error: String,
743    },
744
745    // TODO: we currently don't have a way to include more info about the panic.
746    /// Panic inside the DBSP runtime.
747    DbspPanic,
748
749    /// Panic inside the DBSP controller.
750    ControllerPanic,
751
752    /// Controller terminated before command could be executed.
753    ControllerExit,
754
755    /// Storage error.
756    #[serde(serialize_with = "serialize_storage_error")]
757    StorageError {
758        /// Describes the context where the error occurred.
759        context: String,
760        error: StorageError,
761        backtrace: Box<Backtrace>,
762    },
763
764    /// Enterprise-only feature.
765    EnterpriseFeature(&'static str),
766
767    /// Cannot checkpoint or suspend.
768    SuspendError(SuspendError),
769
770    /// An unexpected JSON serialized structure was encountered while processing the /stats endpoint.
771    UnexpectedJsonStructure {
772        reason: String,
773    },
774
775    /// The request relates to an old incarnation of the pipeline.
776    PipelineRestarted {
777        error: String,
778    },
779
780    /// Completion token specified non-existing endpoint id. This indicates that the endpoint was removed
781    /// or the token is invalid.
782    UnknownEndpointInCompletionToken {
783        endpoint_id: u64,
784    },
785
786    /// Error fetching checkpoint from remote object storage.
787    CheckpointFetchError {
788        error: String,
789    },
790
791    /// Error pushing checkpoint to remote object storage.
792    CheckpointPushError {
793        error: String,
794    },
795
796    TransactionInProgress,
797    NoTransactionInProgress,
798
799    /// Invalid initial desired status.
800    InvalidInitialStatus(RuntimeDesiredStatus),
801
802    /// Invalid standby configuration,
803    InvalidStandby(&'static str),
804
805    /// Invalid startup status transition.
806    InvalidStartupTransition {
807        from: RuntimeDesiredStatus,
808        to: RuntimeDesiredStatus,
809    },
810
811    BootstrapRejectedByUser,
812
813    BootstrapNotAllowed {
814        error: String,
815    },
816
817    UnexpectedBootstrap {
818        bootstrap_info: Option<BootstrapInfo>,
819    },
820
821    /// Unexpected runtime version
822    UnexpectedRuntimeVersion {
823        error: String,
824    },
825}
826
827impl ResponseError for ControllerError {
828    fn status_code(&self) -> StatusCode {
829        match self {
830            Self::Config { config_error }
831                if matches!(**config_error, ConfigError::UnknownInputStream { .. }) =>
832            {
833                StatusCode::NOT_FOUND
834            }
835            Self::Config { config_error }
836                if matches!(**config_error, ConfigError::UnknownOutputStream { .. }) =>
837            {
838                StatusCode::NOT_FOUND
839            }
840            Self::Config { .. } => StatusCode::BAD_REQUEST,
841            Self::UnknownInputEndpoint { .. } => StatusCode::NOT_FOUND,
842            Self::UnknownOutputEndpoint { .. } => StatusCode::NOT_FOUND,
843            Self::ParseError { .. } => StatusCode::BAD_REQUEST,
844            Self::NotSupported { .. } => StatusCode::BAD_REQUEST,
845            Self::EnterpriseFeature(_) => StatusCode::NOT_IMPLEMENTED,
846            Self::RestoreInProgress => StatusCode::SERVICE_UNAVAILABLE,
847            Self::BootstrapInProgress => StatusCode::SERVICE_UNAVAILABLE,
848            Self::PipelineRestarted { .. } => StatusCode::GONE,
849            Self::UnknownEndpointInCompletionToken { .. } => StatusCode::GONE,
850            Self::TransactionInProgress => StatusCode::CONFLICT,
851            Self::NoTransactionInProgress => StatusCode::BAD_REQUEST,
852            Self::InvalidInitialStatus(_) => StatusCode::GONE,
853            Self::BootstrapRejectedByUser => StatusCode::CONFLICT,
854            Self::UnexpectedBootstrap { .. } => StatusCode::INTERNAL_SERVER_ERROR,
855            Self::UnexpectedRuntimeVersion { .. } => StatusCode::INTERNAL_SERVER_ERROR,
856            _ => StatusCode::INTERNAL_SERVER_ERROR,
857        }
858    }
859
860    fn error_response(&self) -> HttpResponse<BoxBody> {
861        HttpResponseBuilder::new(self.status_code()).json(ErrorResponse::from_error(self))
862    }
863}
864
865fn serialize_io_error<S>(
866    context: &String,
867    io_error: &IoError,
868    backtrace: &Backtrace,
869    serializer: S,
870) -> Result<S::Ok, S::Error>
871where
872    S: Serializer,
873{
874    let mut ser = serializer.serialize_struct("IoError", 4)?;
875    ser.serialize_field("context", context)?;
876    ser.serialize_field("kind", &io_error.kind().to_string())?;
877    ser.serialize_field("os_error", &io_error.raw_os_error())?;
878    ser.serialize_field("backtrace", &backtrace.to_string())?;
879    ser.end()
880}
881
882fn serialize_storage_error<S>(
883    context: &String,
884    error: &StorageError,
885    backtrace: &Backtrace,
886    serializer: S,
887) -> Result<S::Ok, S::Error>
888where
889    S: Serializer,
890{
891    let mut ser = serializer.serialize_struct("StorageError", 4)?;
892    ser.serialize_field("context", context)?;
893    ser.serialize_field("error", &error.to_string())?;
894    ser.serialize_field("backtrace", &backtrace.to_string())?;
895    ser.end()
896}
897
898fn serialize_encode_error<S>(
899    endpoint: &String,
900    error: &AnyError,
901    serializer: S,
902) -> Result<S::Ok, S::Error>
903where
904    S: Serializer,
905{
906    let mut ser = serializer.serialize_struct("EncodeError", 3)?;
907    ser.serialize_field("endpoint_name", endpoint)?;
908    ser.serialize_field("error", &error.to_string())?;
909    ser.serialize_field("backtrace", &error.backtrace().to_string())?;
910    ser.end()
911}
912
913fn serialize_input_transport_error<S>(
914    endpoint: &String,
915    fatal: &bool,
916    error: &AnyError,
917    serializer: S,
918) -> Result<S::Ok, S::Error>
919where
920    S: Serializer,
921{
922    let mut ser = serializer.serialize_struct("InputTransportError", 4)?;
923    ser.serialize_field("endpoint_name", endpoint)?;
924    ser.serialize_field("fatal", fatal)?;
925    ser.serialize_field("error", &error.to_string())?;
926    ser.serialize_field("backtrace", &error.backtrace().to_string())?;
927    ser.end()
928}
929
930fn serialize_output_transport_error<S>(
931    endpoint: &String,
932    fatal: &bool,
933    error: &AnyError,
934    serializer: S,
935) -> Result<S::Ok, S::Error>
936where
937    S: Serializer,
938{
939    let mut ser = serializer.serialize_struct("OutputTransportError", 4)?;
940    ser.serialize_field("endpoint_name", endpoint)?;
941    ser.serialize_field("fatal", fatal)?;
942    ser.serialize_field("error", &error.to_string())?;
943    ser.serialize_field("backtrace", &error.backtrace().to_string())?;
944    ser.end()
945}
946
947impl DbspDetailedError for ControllerError {
948    // TODO: attempts to cast `AnyError` to `DetailedError`.
949    fn error_code(&self) -> Cow<'static, str> {
950        match self {
951            Self::PreprocessorCreateError { .. } => Cow::from("PreprocessorCreateError"),
952            Self::IoError { .. } => Cow::from("ControllerIoError"),
953            Self::NotSupported { .. } => Cow::from("NotSupported"),
954            Self::SchemaParseError { .. } => Cow::from("SchemaParseError"),
955            Self::SchemaValidationError { .. } => Cow::from("SchemaParseError"),
956            Self::CheckpointParseError { .. } => Cow::from("CheckpointParseError"),
957            Self::CheckpointDoesNotMatchPipeline => Cow::from("CheckpointDoesNotMatchPipeline"),
958            Self::RestoreInProgress => Cow::from("RestoreInProgress"),
959            Self::BootstrapInProgress => Cow::from("BootstrapInProgress"),
960            Self::StepError { .. } => Cow::from("StepError"),
961            Self::UnexpectedStep { .. } => Cow::from("UnexpectedStep"),
962            Self::ReplayFailure { .. } => Cow::from("ReplayFailure"),
963            Self::IrParseError { .. } => Cow::from("IrParseError"),
964            Self::CliArgsError { .. } => Cow::from("ControllerCliArgsError"),
965            Self::Config { config_error } => {
966                Cow::from(format!("ConfigError.{}", config_error.error_code()))
967            }
968            Self::UnknownInputEndpoint { .. } => Cow::from("UnknownInputEndpoint"),
969            Self::UnknownOutputEndpoint { .. } => Cow::from("UnknownOutputEndpoint"),
970            Self::ParseError { .. } => Cow::from("ParseError"),
971            Self::EncodeError { .. } => Cow::from("EncodeError"),
972            Self::InputTransportError { .. } => Cow::from("InputTransportError"),
973            Self::OutputTransportError { .. } => Cow::from("OutputTransportError"),
974            Self::CommandError { .. } => Cow::from("CommandError"),
975            Self::PrometheusError { .. } => Cow::from("PrometheusError"),
976            Self::DbspError { error } => error.error_code(),
977            Self::DbspPanic => Cow::from("DbspPanic"),
978            Self::ControllerPanic => Cow::from("ControllerPanic"),
979            Self::ControllerExit => Cow::from("ControllerExit"),
980            Self::EnterpriseFeature(_) => Cow::from("EnterpriseFeature"),
981            Self::StorageError { .. } => Cow::from("StorageError"),
982            Self::SuspendError(_) => Cow::from("SuspendError"),
983            Self::UnexpectedJsonStructure { .. } => Cow::from("UnexpectedJsonStructure"),
984            Self::PipelineRestarted { .. } => Cow::from("PipelineRestarted"),
985            Self::UnknownEndpointInCompletionToken { .. } => {
986                Cow::from("UnknownEndpointInCompletionToken")
987            }
988            Self::CheckpointFetchError { .. } => Cow::from("CheckpointFetchError"),
989            Self::CheckpointPushError { .. } => Cow::from("CheckpointPushError"),
990            Self::TransactionInProgress => Cow::from("TransactionInProgress"),
991            Self::NoTransactionInProgress => Cow::from("NoTransactionInProgress"),
992            Self::InvalidInitialStatus(_) => Cow::from("InvalidInitialStatus"),
993            Self::InvalidStandby(_) => Cow::from("InvalidStandby"),
994            Self::InvalidStartupTransition { .. } => Cow::from("InvalidStartupTransition"),
995            Self::BootstrapRejectedByUser => Cow::from("BootstrapRejected"),
996            Self::BootstrapNotAllowed { .. } => Cow::from("BootstrapNotAllowed"),
997            Self::UnexpectedBootstrap { .. } => Cow::from("UnexpectedBootstrap"),
998            Self::UnexpectedRuntimeVersion { .. } => Cow::from("UnexpectedRuntimeVersion"),
999        }
1000    }
1001}
1002
1003impl DetailedError for ControllerError {
1004    // TODO: attempts to cast `AnyError` to `DetailedError`.
1005    fn error_code(&self) -> Cow<'static, str> {
1006        DbspDetailedError::error_code(self)
1007    }
1008}
1009
1010impl StdError for ControllerError {}
1011
1012impl Display for ControllerError {
1013    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
1014        match self {
1015            Self::PreprocessorCreateError {
1016                endpoint_name,
1017                error,
1018            } => {
1019                write!(
1020                    f,
1021                    "Error creating preprocessor for endpoint {endpoint_name}: {error}"
1022                )
1023            }
1024            Self::IoError {
1025                context, io_error, ..
1026            } => {
1027                write!(f, "I/O error {context}: {io_error}")
1028            }
1029            Self::NotSupported { error } => {
1030                write!(f, "Not supported: {error}")
1031            }
1032            Self::SchemaParseError { error } => {
1033                write!(f, "Error parsing program schema: {error}")
1034            }
1035            Self::SchemaValidationError { error } => {
1036                write!(f, "Error validating program schema: {error}")
1037            }
1038            Self::CheckpointParseError { error } => {
1039                write!(f, "Error parsing checkpoint file: {error}")
1040            }
1041            Self::CheckpointDoesNotMatchPipeline => {
1042                write!(
1043                    f,
1044                    "Recovery failed: the pipeline has been recovered from a checkpoint, but the checkpoint does not match the current pipeline definition. This can be caused by a corrupted checkpoint or an internal error."
1045                )
1046            }
1047            Self::RestoreInProgress => {
1048                write!(
1049                    f,
1050                    "Operation cannot be initiated now because the pipeline is restoring from a checkpoint."
1051                )
1052            }
1053            Self::BootstrapInProgress => {
1054                write!(
1055                    f,
1056                    "Operation cannot be initiated while the pipeline is bootstrapping."
1057                )
1058            }
1059            Self::StepError(error) => write!(f, "Error with persistent input steps: {error}"),
1060            Self::UnexpectedStep { actual, expected } => {
1061                write!(f, "Read step {actual}, expected {expected}")
1062            }
1063            Self::ReplayFailure { error } => {
1064                write!(f, "{error}")
1065            }
1066            Self::IrParseError { error } => {
1067                write!(f, "Error parsing program IR: {error}")
1068            }
1069            Self::CliArgsError { error } => {
1070                write!(f, "Error parsing command line arguments: {error}")
1071            }
1072            Self::Config { config_error } => {
1073                write!(f, "invalid controller configuration: {config_error}")
1074            }
1075            Self::UnknownInputEndpoint { endpoint_name } => {
1076                write!(f, "unknown input endpoint name '{endpoint_name}'")
1077            }
1078            Self::UnknownOutputEndpoint { endpoint_name } => {
1079                write!(f, "unknown output endpoint name '{endpoint_name}'")
1080            }
1081            Self::InputTransportError {
1082                endpoint_name,
1083                fatal,
1084                error,
1085            } => {
1086                write!(
1087                    f,
1088                    "{}error on input endpoint '{endpoint_name}': {}",
1089                    if *fatal { "FATAL " } else { "" },
1090                    error.root_cause()
1091                )
1092            }
1093            Self::OutputTransportError {
1094                endpoint_name,
1095                fatal,
1096                error,
1097            } => {
1098                write!(
1099                    f,
1100                    "{}error on output endpoint '{endpoint_name}': {}",
1101                    if *fatal { "FATAL " } else { "" },
1102                    error.root_cause()
1103                )
1104            }
1105            Self::CommandError {
1106                endpoint_name,
1107                error,
1108            } => {
1109                write!(
1110                    f,
1111                    "error executing command on output endpoint '{endpoint_name}': {error}"
1112                )
1113            }
1114            Self::ParseError {
1115                endpoint_name,
1116                error,
1117            } => {
1118                write!(
1119                    f,
1120                    "parse error on input endpoint '{endpoint_name}': {error}"
1121                )
1122            }
1123            Self::EncodeError {
1124                endpoint_name,
1125                error,
1126            } => {
1127                write!(
1128                    f,
1129                    "encoder error on output endpoint '{endpoint_name}': {error}"
1130                )
1131            }
1132            Self::PrometheusError { error } => {
1133                write!(f, "Error in the Prometheus metrics module: '{error}'")
1134            }
1135            Self::DbspError { error } => {
1136                write!(f, "DBSP error: {error}")
1137            }
1138            Self::DbspPanic => {
1139                write!(f, "Panic inside the DBSP runtime")
1140            }
1141            Self::ControllerPanic => {
1142                write!(f, "Panic inside the DBSP controller")
1143            }
1144            Self::ControllerExit => {
1145                write!(f, "Controller exited before command could be executed")
1146            }
1147            Self::EnterpriseFeature(feature) => {
1148                write!(
1149                    f,
1150                    "Cannot use enterprise-only feature ({feature}) in Feldera community edition."
1151                )
1152            }
1153            Self::StorageError { context, error, .. } => {
1154                write!(f, "I/O error {context}: {error}")
1155            }
1156            Self::SuspendError(error) => write!(f, "{error}"),
1157            Self::UnexpectedJsonStructure { reason } => {
1158                write!(f, "An unexpected JSON structure was detected: {reason}")
1159            }
1160            Self::PipelineRestarted { error } => {
1161                write!(f, "{error}")
1162            }
1163            Self::UnknownEndpointInCompletionToken { endpoint_id } => {
1164                write!(
1165                    f,
1166                    "completion token specifies input endpoint id {endpoint_id}, which doesn't exist; this indicates that the input connector was deleted after the completion token was generated"
1167                )
1168            }
1169            Self::CheckpointFetchError { error } => {
1170                write!(f, "Error fetching checkpoint from object store: {error}")
1171            }
1172            Self::CheckpointPushError { error } => {
1173                write!(f, "Error pushing checkpoint to object store: {error}")
1174            }
1175            Self::TransactionInProgress => {
1176                write!(
1177                    f,
1178                    "Cannot perform this operation while there is a transaction in progress"
1179                )
1180            }
1181            Self::NoTransactionInProgress => {
1182                write!(
1183                    f,
1184                    "This operation requires an active transaction, but none is currently in progress"
1185                )
1186            }
1187            Self::InvalidInitialStatus(status) => {
1188                write!(
1189                    f,
1190                    "Invalid initial status {status:?} provided on command line or read from storage (only coordination, running, paused, and standby are valid)"
1191                )
1192            }
1193            Self::InvalidStandby(standby) => {
1194                write!(f, "Cannot enter standby mode: {standby}")
1195            }
1196            Self::InvalidStartupTransition { from, to } => {
1197                write!(
1198                    f,
1199                    "Invalid pipeline startup transition from {from:?} to {to:?}"
1200                )
1201            }
1202            Self::BootstrapRejectedByUser => {
1203                write!(
1204                    f,
1205                    "Bootstrapping of the modified pipeline was rejected by the user."
1206                )
1207            }
1208            Self::BootstrapNotAllowed { error } => {
1209                write!(
1210                    f,
1211                    "The pipeline has been modified since the last checkpoint; however it cannot be bootstrapped due to the following reasons:\n{error}"
1212                )
1213            }
1214            Self::UnexpectedBootstrap { bootstrap_info } => {
1215                write!(
1216                    f,
1217                    "The pipeline's query plan was modified since the last checkpoint and requires bootstrapping; however we were not able to detect the changes in the pipeline metadata. This is a bug; please report to the developers. Bootstrap info: {bootstrap_info:?}"
1218                )
1219            }
1220            Self::UnexpectedRuntimeVersion { error } => {
1221                write!(f, "{error}")
1222            }
1223        }
1224    }
1225}
1226
1227impl ControllerError {
1228    pub fn io_error(context: impl Display, io_error: IoError) -> Self {
1229        Self::IoError {
1230            context: context.to_string(),
1231            io_error,
1232            backtrace: Backtrace::capture(),
1233        }
1234    }
1235
1236    pub fn not_supported(error: &str) -> Self {
1237        Self::NotSupported {
1238            error: error.to_string(),
1239        }
1240    }
1241
1242    pub fn schema_parse_error(error: &str) -> Self {
1243        Self::SchemaParseError {
1244            error: error.to_string(),
1245        }
1246    }
1247
1248    pub fn checkpoint_does_not_match_pipeline() -> Self {
1249        Self::CheckpointDoesNotMatchPipeline
1250    }
1251
1252    pub fn checkpoint_fetch_error(error: String) -> Self {
1253        Self::CheckpointFetchError { error }
1254    }
1255
1256    pub fn checkpoint_push_error(error: String) -> Self {
1257        Self::CheckpointPushError { error }
1258    }
1259
1260    pub fn schema_validation_error(error: &str) -> Self {
1261        Self::SchemaValidationError {
1262            error: error.to_string(),
1263        }
1264    }
1265
1266    pub fn ir_parse_error(error: &str) -> Self {
1267        Self::IrParseError {
1268            error: error.to_string(),
1269        }
1270    }
1271
1272    pub fn cli_args_error<E>(error: &E) -> Self
1273    where
1274        E: ToString,
1275    {
1276        Self::CliArgsError {
1277            error: error.to_string(),
1278        }
1279    }
1280
1281    pub fn unknown_input_endpoint(endpoint_name: &str) -> Self {
1282        Self::UnknownInputEndpoint {
1283            endpoint_name: endpoint_name.to_string(),
1284        }
1285    }
1286
1287    pub fn unknown_output_endpoint(endpoint_name: &str) -> Self {
1288        Self::UnknownOutputEndpoint {
1289            endpoint_name: endpoint_name.to_string(),
1290        }
1291    }
1292
1293    pub fn pipeline_config_parse_error<E>(error: &E) -> Self
1294    where
1295        E: ToString,
1296    {
1297        Self::Config {
1298            config_error: Box::new(ConfigError::pipeline_config_parse_error(error)),
1299        }
1300    }
1301
1302    pub fn parser_config_parse_error<E>(endpoint_name: &str, error: &E, config: &str) -> Self
1303    where
1304        E: ToString,
1305    {
1306        Self::Config {
1307            config_error: Box::new(ConfigError::parser_config_parse_error(
1308                endpoint_name,
1309                error,
1310                config,
1311            )),
1312        }
1313    }
1314
1315    pub fn encoder_config_parse_error<E>(endpoint_name: &str, error: &E, config: &str) -> Self
1316    where
1317        E: ToString,
1318    {
1319        Self::Config {
1320            config_error: Box::new(ConfigError::encoder_config_parse_error(
1321                endpoint_name,
1322                error,
1323                config,
1324            )),
1325        }
1326    }
1327
1328    pub fn duplicate_input_endpoint(endpoint_name: &str) -> Self {
1329        Self::Config {
1330            config_error: Box::new(ConfigError::duplicate_input_endpoint(endpoint_name)),
1331        }
1332    }
1333
1334    pub fn duplicate_input_stream(stream_name: &str) -> Self {
1335        Self::Config {
1336            config_error: Box::new(ConfigError::duplicate_input_stream(stream_name)),
1337        }
1338    }
1339
1340    pub fn unknown_input_format(endpoint_name: &str, format_name: &str) -> Self {
1341        Self::Config {
1342            config_error: Box::new(ConfigError::unknown_input_format(
1343                endpoint_name,
1344                format_name,
1345            )),
1346        }
1347    }
1348
1349    pub fn unknown_input_transport(endpoint_name: &str, transport_name: &str) -> Self {
1350        Self::Config {
1351            config_error: Box::new(ConfigError::unknown_input_transport(
1352                endpoint_name,
1353                transport_name,
1354            )),
1355        }
1356    }
1357
1358    pub fn duplicate_output_endpoint(endpoint_name: &str) -> Self {
1359        Self::Config {
1360            config_error: Box::new(ConfigError::duplicate_output_endpoint(endpoint_name)),
1361        }
1362    }
1363
1364    pub fn duplicate_output_stream(stream_name: &str) -> Self {
1365        Self::Config {
1366            config_error: Box::new(ConfigError::duplicate_output_stream(stream_name)),
1367        }
1368    }
1369
1370    pub fn unknown_output_format(endpoint_name: &str, format_name: &str) -> Self {
1371        Self::Config {
1372            config_error: Box::new(ConfigError::unknown_output_format(
1373                endpoint_name,
1374                format_name,
1375            )),
1376        }
1377    }
1378
1379    pub fn unknown_output_transport(endpoint_name: &str, transport_name: &str) -> Self {
1380        Self::Config {
1381            config_error: Box::new(ConfigError::unknown_output_transport(
1382                endpoint_name,
1383                transport_name,
1384            )),
1385        }
1386    }
1387
1388    pub fn unknown_input_stream(endpoint_name: &str, stream_name: &str) -> Self {
1389        Self::Config {
1390            config_error: Box::new(ConfigError::unknown_input_stream(
1391                endpoint_name,
1392                stream_name,
1393            )),
1394        }
1395    }
1396
1397    pub fn unknown_output_stream(endpoint_name: &str, stream_name: &str) -> Self {
1398        Self::Config {
1399            config_error: Box::new(ConfigError::unknown_output_stream(
1400                endpoint_name,
1401                stream_name,
1402            )),
1403        }
1404    }
1405
1406    pub fn unknown_index(endpoint_name: &str, index_name: &str) -> Self {
1407        Self::Config {
1408            config_error: Box::new(ConfigError::unknown_index(endpoint_name, index_name)),
1409        }
1410    }
1411
1412    pub fn not_an_index(endpoint_name: &str, index_name: &str) -> Self {
1413        Self::Config {
1414            config_error: Box::new(ConfigError::not_an_index(endpoint_name, index_name)),
1415        }
1416    }
1417
1418    pub fn input_format_not_supported(endpoint_name: &str, error: &str) -> Self {
1419        Self::Config {
1420            config_error: Box::new(ConfigError::input_format_not_supported(
1421                endpoint_name,
1422                error,
1423            )),
1424        }
1425    }
1426
1427    pub fn output_format_not_supported(endpoint_name: &str, error: &str) -> Self {
1428        Self::Config {
1429            config_error: Box::new(ConfigError::output_format_not_supported(
1430                endpoint_name,
1431                error,
1432            )),
1433        }
1434    }
1435
1436    pub fn input_format_not_specified(endpoint_name: &str) -> Self {
1437        Self::Config {
1438            config_error: Box::new(ConfigError::input_format_not_specified(endpoint_name)),
1439        }
1440    }
1441
1442    pub fn output_format_not_specified(endpoint_name: &str) -> Self {
1443        Self::Config {
1444            config_error: Box::new(ConfigError::output_format_not_specified(endpoint_name)),
1445        }
1446    }
1447
1448    pub fn invalid_encoder_configuration(endpoint_name: &str, error: &str) -> Self {
1449        Self::Config {
1450            config_error: Box::new(ConfigError::invalid_encoder_configuration(
1451                endpoint_name,
1452                error,
1453            )),
1454        }
1455    }
1456
1457    pub fn invalid_parser_configuration(endpoint_name: &str, error: &str) -> Self {
1458        Self::Config {
1459            config_error: Box::new(ConfigError::invalid_parser_configuration(
1460                endpoint_name,
1461                error,
1462            )),
1463        }
1464    }
1465
1466    pub fn invalid_transport_configuration(endpoint_name: &str, error: &str) -> Self {
1467        Self::Config {
1468            config_error: Box::new(ConfigError::invalid_transport_configuration(
1469                endpoint_name,
1470                error,
1471            )),
1472        }
1473    }
1474
1475    pub fn invalid_output_buffer_configuration(endpoint_name: &str, error: &str) -> Self {
1476        Self::Config {
1477            config_error: Box::new(ConfigError::invalid_output_buffer_configuration(
1478                endpoint_name,
1479                error,
1480            )),
1481        }
1482    }
1483
1484    pub fn input_transport_error(endpoint_name: &str, fatal: bool, error: AnyError) -> Self {
1485        Self::InputTransportError {
1486            endpoint_name: endpoint_name.to_owned(),
1487            fatal,
1488            error,
1489        }
1490    }
1491
1492    pub fn output_transport_error(endpoint_name: &str, fatal: bool, error: AnyError) -> Self {
1493        Self::OutputTransportError {
1494            endpoint_name: endpoint_name.to_owned(),
1495            fatal,
1496            error,
1497        }
1498    }
1499
1500    pub fn command_error(endpoint_name: &str, error: &str) -> Self {
1501        Self::CommandError {
1502            endpoint_name: endpoint_name.to_owned(),
1503            error: error.to_string(),
1504        }
1505    }
1506
1507    pub fn parse_error(endpoint_name: &str, error: ParseError) -> Self {
1508        Self::ParseError {
1509            endpoint_name: endpoint_name.to_owned(),
1510            error: Box::new(error),
1511        }
1512    }
1513
1514    pub fn encode_error(endpoint_name: &str, error: AnyError) -> Self {
1515        Self::EncodeError {
1516            endpoint_name: endpoint_name.to_owned(),
1517            error,
1518        }
1519    }
1520
1521    pub fn prometheus_error<E>(error: &E) -> Self
1522    where
1523        E: ToString,
1524    {
1525        Self::PrometheusError {
1526            error: error.to_string(),
1527        }
1528    }
1529
1530    pub fn dbsp_error(error: DbspError) -> Self {
1531        Self::DbspError { error }
1532    }
1533
1534    pub fn dbsp_panic() -> Self {
1535        Self::DbspPanic
1536    }
1537
1538    pub fn controller_panic() -> Self {
1539        Self::ControllerPanic
1540    }
1541
1542    pub fn storage_error(context: impl Display, error: StorageError) -> Self {
1543        Self::StorageError {
1544            context: context.to_string(),
1545            error,
1546            backtrace: Box::new(Backtrace::capture()),
1547        }
1548    }
1549
1550    pub fn pipeline_restarted(error: &str) -> Self {
1551        Self::PipelineRestarted {
1552            error: error.to_string(),
1553        }
1554    }
1555
1556    pub fn unknown_endpoint_in_completion_token(endpoint_id: u64) -> Self {
1557        Self::UnknownEndpointInCompletionToken { endpoint_id }
1558    }
1559
1560    pub fn kind(&self) -> ErrorKind {
1561        match self {
1562            Self::IoError { io_error, .. } => io_error.kind(),
1563            Self::StorageError { error, .. } => error.kind(),
1564            Self::StepError(error) => error.kind(),
1565            Self::RestoreInProgress
1566            | Self::BootstrapInProgress
1567            | Self::SuspendError(SuspendError::Temporary(_))
1568            | Self::TransactionInProgress => ErrorKind::ResourceBusy,
1569            Self::NotSupported { .. } | Self::SuspendError(SuspendError::Permanent(_)) => {
1570                ErrorKind::Unsupported
1571            }
1572            Self::DbspError {
1573                error: DbspError::IO(error),
1574            } => error.kind(),
1575            Self::DbspError { .. }
1576            | Self::SchemaParseError { .. }
1577            | Self::SchemaValidationError { .. }
1578            | Self::CheckpointParseError { .. }
1579            | Self::CheckpointDoesNotMatchPipeline
1580            | Self::UnexpectedStep { .. }
1581            | Self::ReplayFailure { .. }
1582            | Self::IrParseError { .. }
1583            | Self::CliArgsError { .. }
1584            | Self::Config { .. }
1585            | Self::UnknownInputEndpoint { .. }
1586            | Self::UnknownOutputEndpoint { .. }
1587            | Self::ParseError { .. }
1588            | Self::EncodeError { .. }
1589            | Self::InputTransportError { .. }
1590            | Self::OutputTransportError { .. }
1591            | Self::CommandError { .. }
1592            | Self::PrometheusError { .. }
1593            | Self::DbspPanic
1594            | Self::ControllerPanic
1595            | Self::ControllerExit
1596            | Self::EnterpriseFeature(_)
1597            | Self::UnexpectedJsonStructure { .. }
1598            | Self::UnknownEndpointInCompletionToken { .. }
1599            | Self::CheckpointFetchError { .. }
1600            | Self::CheckpointPushError { .. }
1601            | Self::PipelineRestarted { .. }
1602            | Self::NoTransactionInProgress
1603            | Self::InvalidInitialStatus(_)
1604            | Self::BootstrapRejectedByUser
1605            | Self::BootstrapNotAllowed { .. }
1606            | Self::UnexpectedBootstrap { .. }
1607            | Self::InvalidStandby(_)
1608            | Self::InvalidStartupTransition { .. }
1609            | Self::PreprocessorCreateError { .. }
1610            | Self::UnexpectedRuntimeVersion { .. } => ErrorKind::Other,
1611        }
1612    }
1613}
1614
1615impl From<ConfigError> for ControllerError {
1616    fn from(config_error: ConfigError) -> Self {
1617        Self::Config {
1618            config_error: Box::new(config_error),
1619        }
1620    }
1621}
1622
1623impl From<DbspError> for ControllerError {
1624    fn from(error: DbspError) -> Self {
1625        Self::DbspError { error }
1626    }
1627}
1628
1629impl From<SuspendError> for ControllerError {
1630    fn from(error: SuspendError) -> Self {
1631        Self::SuspendError(error)
1632    }
1633}