1use std::{
2 backtrace::Backtrace,
3 borrow::Cow,
4 error::Error as StdError,
5 fmt::{Display, Error as FmtError, Formatter},
6 io::{Error as IoError, ErrorKind},
7 string::ToString,
8};
9
10use actix_web::body::BoxBody;
11use actix_web::http::StatusCode;
12use actix_web::{HttpResponse, HttpResponseBuilder, ResponseError};
13use anyhow::Error as AnyError;
14use dbsp::{
15 Error as DbspError,
16 circuit::{LayoutError, circuit_builder::BootstrapInfo},
17 storage::backend::StorageError,
18};
19use feldera_types::{
20 error::{DetailedError, ErrorResponse},
21 runtime_status::RuntimeDesiredStatus,
22 suspend::SuspendError,
23};
24use serde::{Serialize, Serializer, ser::SerializeStruct};
25
26use super::journal::StepError;
27use crate::{DbspDetailedError, format::ParseError, transport::Step};
28
29#[derive(Debug, Serialize)]
31#[serde(untagged)]
32pub enum ConfigError {
33 PipelineConfigParseError {
35 error: String,
36 },
37
38 ParserConfigParseError {
40 endpoint_name: String,
41 error: String,
42 config: String,
43 },
44
45 EncoderConfigParseError {
47 endpoint_name: String,
48 error: String,
49 config: String,
50 },
51
52 DuplicateInputEndpoint {
54 endpoint_name: String,
55 },
56
57 DuplicateInputStream {
59 stream_name: String,
60 },
61
62 DuplicateOutputEndpoint {
64 endpoint_name: String,
65 },
66
67 DuplicateOutputStream {
69 stream_name: String,
70 },
71
72 UnknownInputFormat {
74 endpoint_name: String,
75 format_name: String,
76 },
77
78 UnknownOutputFormat {
80 endpoint_name: String,
81 format_name: String,
82 },
83
84 UnknownInputTransport {
86 endpoint_name: String,
87 transport_name: String,
88 },
89
90 UnknownOutputTransport {
92 endpoint_name: String,
93 transport_name: String,
94 },
95
96 UnknownInputStream {
99 endpoint_name: String,
100 stream_name: String,
101 },
102
103 UnknownOutputStream {
106 endpoint_name: String,
107 stream_name: String,
108 },
109
110 UnknownIndex {
113 endpoint_name: String,
114 index_name: String,
115 },
116
117 NotAnIndex {
118 endpoint_name: String,
119 index_name: String,
120 },
121
122 InputFormatNotSupported {
123 endpoint_name: String,
124 error: String,
125 },
126
127 OutputFormatNotSupported {
128 endpoint_name: String,
129 error: String,
130 },
131
132 InputFormatNotSpecified {
133 endpoint_name: String,
134 },
135
136 OutputFormatNotSpecified {
137 endpoint_name: String,
138 },
139
140 InvalidEncoderConfig {
141 endpoint_name: String,
142 error: String,
143 },
144
145 InvalidParserConfig {
146 endpoint_name: String,
147 error: String,
148 },
149
150 InvalidTransportConfig {
151 endpoint_name: String,
152 error: String,
153 },
154
155 InvalidOutputBufferConfig {
156 endpoint_name: String,
157 error: String,
158 },
159
160 CyclicDependency {
161 cycle: Vec<(String, String)>,
162 },
163
164 EmptyStartAfter {
165 endpoint_name: String,
166 },
167
168 FtRequiresStorage,
169 FtRequiresFtInput,
170
171 DatafusionMemoryExceedsBudget {
175 datafusion_memory_mb: u64,
176 max_rss_mb: u64,
177 },
178
179 InvalidLayout(LayoutError),
180}
181
182impl StdError for ConfigError {}
183
184impl DbspDetailedError for ConfigError {
185 fn error_code(&self) -> Cow<'static, str> {
186 match self {
187 Self::PipelineConfigParseError { .. } => Cow::from("PipelineConfigParseError"),
188 Self::ParserConfigParseError { .. } => Cow::from("ParserConfigParseError"),
189 Self::EncoderConfigParseError { .. } => Cow::from("EncoderConfigParseError"),
190 Self::DuplicateInputEndpoint { .. } => Cow::from("DuplicateInputEndpoint"),
191 Self::DuplicateInputStream { .. } => Cow::from("DuplicateInputStream"),
192 Self::DuplicateOutputEndpoint { .. } => Cow::from("DuplicateOutputEndpoint"),
193 Self::DuplicateOutputStream { .. } => Cow::from("DuplicateOutputStream"),
194 Self::UnknownInputFormat { .. } => Cow::from("UnknownInputFormat"),
195 Self::UnknownOutputFormat { .. } => Cow::from("UnknownOutputFormat"),
196 Self::UnknownInputTransport { .. } => Cow::from("UnknownInputTransport"),
197 Self::UnknownOutputTransport { .. } => Cow::from("UnknownOutputTransport"),
198 Self::UnknownInputStream { .. } => Cow::from("UnknownInputStream"),
199 Self::UnknownOutputStream { .. } => Cow::from("UnknownOutputStream"),
200 Self::UnknownIndex { .. } => Cow::from("UnknownIndex"),
201 Self::NotAnIndex { .. } => Cow::from("NotAnIndex"),
202 Self::InputFormatNotSupported { .. } => Cow::from("InputFormatNotSupported"),
203 Self::OutputFormatNotSupported { .. } => Cow::from("OutputFormatNotSupported"),
204 Self::InputFormatNotSpecified { .. } => Cow::from("InputFormatNotSpecified"),
205 Self::OutputFormatNotSpecified { .. } => Cow::from("OutputFormatNotSpecified"),
206 Self::InvalidEncoderConfig { .. } => Cow::from("InvalidEncoderConfig"),
207 Self::InvalidParserConfig { .. } => Cow::from("InvalidParserConfig"),
208 Self::InvalidTransportConfig { .. } => Cow::from("InvalidTransportConfig"),
209 Self::InvalidOutputBufferConfig { .. } => Cow::from("InvalidOutputBufferConfig"),
210 Self::FtRequiresStorage => Cow::from("FtRequiresStorage"),
211 Self::FtRequiresFtInput => Cow::from("FtWithNonFtInput"),
212 Self::CyclicDependency { .. } => Cow::from("CyclicDependency"),
213 Self::EmptyStartAfter { .. } => Cow::from("EmptyStartAfter"),
214 Self::DatafusionMemoryExceedsBudget { .. } => {
215 Cow::from("DatafusionMemoryExceedsBudget")
216 }
217 Self::InvalidLayout(_) => Cow::from("LayoutError"),
218 }
219 }
220}
221
222impl Display for ConfigError {
223 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
224 match self {
225 Self::PipelineConfigParseError { error } => {
226 write!(f, "Failed to parse pipeline configuration: {error}")
227 }
228 Self::ParserConfigParseError {
229 endpoint_name,
230 error,
231 config,
232 } => {
233 write!(
234 f,
235 "Error parsing format configuration for input endpoint '{endpoint_name}': {error}\nInvalid configuration: {config}"
236 )
237 }
238 Self::EncoderConfigParseError {
239 endpoint_name,
240 error,
241 config,
242 } => {
243 write!(
244 f,
245 "Error parsing format configuration for output endpoint '{endpoint_name}': {error}\nInvalid configuration: {config}"
246 )
247 }
248 Self::DuplicateInputEndpoint { endpoint_name } => {
249 write!(f, "Input endpoint '{endpoint_name}' already exists")
250 }
251 Self::DuplicateInputStream { stream_name } => {
252 write!(f, "Duplicate table name '{stream_name}'")
253 }
254 Self::UnknownInputFormat {
255 endpoint_name,
256 format_name,
257 } => {
258 write!(
259 f,
260 "Input endpoint '{endpoint_name}' specifies unknown input format '{format_name}'"
261 )
262 }
263 Self::UnknownInputTransport {
264 endpoint_name,
265 transport_name,
266 } => {
267 write!(
268 f,
269 "Input endpoint '{endpoint_name}' specifies unknown input transport '{transport_name}'"
270 )
271 }
272 Self::DuplicateOutputEndpoint { endpoint_name } => {
273 write!(f, "Output endpoint '{endpoint_name}' already exists")
274 }
275 Self::DuplicateOutputStream { stream_name } => {
276 write!(f, "Duplicate table or view name '{stream_name}'")
277 }
278 Self::UnknownOutputFormat {
279 endpoint_name,
280 format_name,
281 } => {
282 write!(
283 f,
284 "Output endpoint '{endpoint_name}' specifies unknown output format '{format_name}'"
285 )
286 }
287 Self::UnknownOutputTransport {
288 endpoint_name,
289 transport_name,
290 } => {
291 write!(
292 f,
293 "Output endpoint '{endpoint_name}' specifies unknown output transport '{transport_name}'"
294 )
295 }
296 Self::UnknownInputStream {
297 endpoint_name,
298 stream_name,
299 } => {
300 write!(
301 f,
302 "Input endpoint '{endpoint_name}' specifies unknown table '{stream_name}'"
303 )
304 }
305 Self::UnknownOutputStream {
306 endpoint_name,
307 stream_name,
308 } => {
309 write!(
310 f,
311 "Output endpoint '{endpoint_name}' specifies unknown output table or view '{stream_name}'"
312 )
313 }
314 Self::UnknownIndex {
315 endpoint_name,
316 index_name,
317 } => {
318 write!(
319 f,
320 "Output endpoint '{endpoint_name}' specifies index name '{index_name}'; however, the '{index_name}' relation is not an index"
321 )
322 }
323
324 Self::NotAnIndex {
325 endpoint_name,
326 index_name,
327 } => {
328 write!(
329 f,
330 "Output endpoint '{endpoint_name}' specifies unknown index '{index_name}'"
331 )
332 }
333
334 Self::InputFormatNotSupported {
335 endpoint_name,
336 error,
337 } => {
338 write!(
339 f,
340 "Format not supported on input endpoint '{endpoint_name}': {error}"
341 )
342 }
343 Self::OutputFormatNotSupported {
344 endpoint_name,
345 error,
346 } => {
347 write!(
348 f,
349 "Format not supported on output endpoint '{endpoint_name}': {error}"
350 )
351 }
352 Self::InputFormatNotSpecified { endpoint_name } => {
353 write!(
354 f,
355 "Data format is not specified for input endpoint '{endpoint_name}' (set the 'format' field inside connector configuration)"
356 )
357 }
358 Self::OutputFormatNotSpecified { endpoint_name } => {
359 write!(
360 f,
361 "Data format is not specified for output endpoint '{endpoint_name}' (set the 'format' field inside connector configuration)"
362 )
363 }
364 Self::InvalidEncoderConfig {
365 endpoint_name,
366 error,
367 } => {
368 write!(
369 f,
370 "invalid format configuration for output endpoint '{endpoint_name}': {error}"
371 )
372 }
373 Self::InvalidParserConfig {
374 endpoint_name,
375 error,
376 } => {
377 write!(
378 f,
379 "invalid format configuration for input endpoint '{endpoint_name}': {error}"
380 )
381 }
382 Self::InvalidTransportConfig {
383 endpoint_name,
384 error,
385 } => {
386 write!(
387 f,
388 "invalid transport configuration for endpoint '{endpoint_name}': {error}"
389 )
390 }
391 Self::InvalidOutputBufferConfig {
392 endpoint_name,
393 error,
394 } => {
395 write!(
396 f,
397 "invalid output buffer configuration for endpoint '{endpoint_name}': {error}"
398 )
399 }
400 Self::CyclicDependency { cycle } => {
401 let mut cycle = cycle.clone();
402 cycle.push(cycle[0].clone());
403 let tail = cycle[1..]
404 .iter()
405 .map(|(endpoint, label)| {
406 format!("waits for endpoint '{endpoint}' with label '{label}'")
407 })
408 .collect::<Vec<_>>()
409 .join(", which ");
410 write!(
411 f,
412 "cyclic 'start_after' dependency detected: endpoint '{}' with label '{}' {}",
413 cycle[0].0, cycle[0].1, tail
414 )
415 }
416 Self::EmptyStartAfter { endpoint_name } => {
417 write!(
418 f,
419 "empty 'start_after' field for input endpoint '{}'",
420 endpoint_name
421 )
422 }
423 Self::FtRequiresStorage => write!(
424 f,
425 "Fault tolerance is configured, which requires storage, but storage is not enabled"
426 ),
427 Self::FtRequiresFtInput => write!(
428 f,
429 "Fault tolerance is configured, but it cannot be enabled because the pipeline has at least one non-fault-tolerant input adapter"
430 ),
431 Self::DatafusionMemoryExceedsBudget {
432 datafusion_memory_mb,
433 max_rss_mb,
434 } => write!(
435 f,
436 "'datafusion_memory_mb' ({datafusion_memory_mb} MB) must be less than the pipeline's memory budget ({max_rss_mb} MB); the difference is the budget available to the DBSP circuit"
437 ),
438 Self::InvalidLayout(e) => write!(f, "Multihost layout error: {e}"),
439 }
440 }
441}
442
443impl ConfigError {
444 pub fn pipeline_config_parse_error<E>(error: &E) -> Self
445 where
446 E: ToString,
447 {
448 Self::PipelineConfigParseError {
449 error: error.to_string(),
450 }
451 }
452
453 pub fn parser_config_parse_error<E>(endpoint_name: &str, error: &E, config: &str) -> Self
454 where
455 E: ToString,
456 {
457 Self::ParserConfigParseError {
458 endpoint_name: endpoint_name.to_owned(),
459 error: error.to_string(),
460 config: config.to_string(),
461 }
462 }
463
464 pub fn encoder_config_parse_error<E>(endpoint_name: &str, error: &E, config: &str) -> Self
465 where
466 E: ToString,
467 {
468 Self::EncoderConfigParseError {
469 endpoint_name: endpoint_name.to_owned(),
470 error: error.to_string(),
471 config: config.to_string(),
472 }
473 }
474
475 pub fn duplicate_input_endpoint(endpoint_name: &str) -> Self {
476 Self::DuplicateInputEndpoint {
477 endpoint_name: endpoint_name.to_owned(),
478 }
479 }
480
481 pub fn duplicate_input_stream(stream_name: &str) -> Self {
482 Self::DuplicateInputStream {
483 stream_name: stream_name.to_owned(),
484 }
485 }
486
487 pub fn unknown_input_format(endpoint_name: &str, format_name: &str) -> Self {
488 Self::UnknownInputFormat {
489 endpoint_name: endpoint_name.to_owned(),
490 format_name: format_name.to_owned(),
491 }
492 }
493
494 pub fn unknown_input_transport(endpoint_name: &str, transport_name: &str) -> Self {
495 Self::UnknownInputTransport {
496 endpoint_name: endpoint_name.to_owned(),
497 transport_name: transport_name.to_owned(),
498 }
499 }
500
501 pub fn duplicate_output_endpoint(endpoint_name: &str) -> Self {
502 Self::DuplicateOutputEndpoint {
503 endpoint_name: endpoint_name.to_owned(),
504 }
505 }
506
507 pub fn duplicate_output_stream(stream_name: &str) -> Self {
508 Self::DuplicateOutputStream {
509 stream_name: stream_name.to_owned(),
510 }
511 }
512
513 pub fn unknown_output_format(endpoint_name: &str, format_name: &str) -> Self {
514 Self::UnknownOutputFormat {
515 endpoint_name: endpoint_name.to_owned(),
516 format_name: format_name.to_owned(),
517 }
518 }
519
520 pub fn unknown_output_transport(endpoint_name: &str, transport_name: &str) -> Self {
521 Self::UnknownOutputTransport {
522 endpoint_name: endpoint_name.to_owned(),
523 transport_name: transport_name.to_owned(),
524 }
525 }
526
527 pub fn unknown_input_stream(endpoint_name: &str, stream_name: &str) -> Self {
528 Self::UnknownInputStream {
529 endpoint_name: endpoint_name.to_owned(),
530 stream_name: stream_name.to_owned(),
531 }
532 }
533
534 pub fn unknown_output_stream(endpoint_name: &str, stream_name: &str) -> Self {
535 Self::UnknownOutputStream {
536 endpoint_name: endpoint_name.to_owned(),
537 stream_name: stream_name.to_owned(),
538 }
539 }
540
541 pub fn unknown_index(endpoint_name: &str, index_name: &str) -> Self {
542 Self::UnknownIndex {
543 endpoint_name: endpoint_name.to_owned(),
544 index_name: index_name.to_owned(),
545 }
546 }
547
548 pub fn not_an_index(endpoint_name: &str, index_name: &str) -> Self {
549 Self::NotAnIndex {
550 endpoint_name: endpoint_name.to_owned(),
551 index_name: index_name.to_owned(),
552 }
553 }
554
555 pub fn input_format_not_supported(endpoint_name: &str, error: &str) -> Self {
556 Self::InputFormatNotSupported {
557 endpoint_name: endpoint_name.to_owned(),
558 error: error.to_owned(),
559 }
560 }
561
562 pub fn output_format_not_supported(endpoint_name: &str, error: &str) -> Self {
563 Self::OutputFormatNotSupported {
564 endpoint_name: endpoint_name.to_owned(),
565 error: error.to_owned(),
566 }
567 }
568
569 pub fn input_format_not_specified(endpoint_name: &str) -> Self {
570 Self::InputFormatNotSpecified {
571 endpoint_name: endpoint_name.to_owned(),
572 }
573 }
574
575 pub fn output_format_not_specified(endpoint_name: &str) -> Self {
576 Self::OutputFormatNotSpecified {
577 endpoint_name: endpoint_name.to_owned(),
578 }
579 }
580
581 pub fn invalid_encoder_configuration(endpoint_name: &str, error: &str) -> Self {
582 Self::InvalidEncoderConfig {
583 endpoint_name: endpoint_name.to_string(),
584 error: error.to_string(),
585 }
586 }
587
588 pub fn invalid_parser_configuration(endpoint_name: &str, error: &str) -> Self {
589 Self::InvalidParserConfig {
590 endpoint_name: endpoint_name.to_string(),
591 error: error.to_string(),
592 }
593 }
594
595 pub fn invalid_transport_configuration(endpoint_name: &str, error: &str) -> Self {
596 Self::InvalidTransportConfig {
597 endpoint_name: endpoint_name.to_string(),
598 error: error.to_string(),
599 }
600 }
601
602 pub fn invalid_output_buffer_configuration(endpoint_name: &str, error: &str) -> Self {
603 Self::InvalidOutputBufferConfig {
604 endpoint_name: endpoint_name.to_string(),
605 error: error.to_string(),
606 }
607 }
608
609 pub fn cyclic_dependency(cycle: Vec<(String, String)>) -> Self {
610 Self::CyclicDependency { cycle }
611 }
612
613 pub fn empty_start_after(endpoint_name: &str) -> Self {
614 Self::EmptyStartAfter {
615 endpoint_name: endpoint_name.to_string(),
616 }
617 }
618}
619
620#[derive(Debug, Serialize)]
626#[serde(untagged)]
627pub enum ControllerError {
628 #[serde(serialize_with = "serialize_io_error")]
630 IoError {
631 context: String,
633 io_error: IoError,
634 backtrace: Backtrace,
635 },
636
637 SchemaParseError {
639 error: String,
640 },
641
642 SchemaValidationError {
644 error: String,
645 },
646
647 CheckpointParseError {
649 error: String,
650 },
651
652 CheckpointDoesNotMatchPipeline,
653
654 RestoreInProgress,
657
658 BootstrapInProgress,
660
661 StepError(StepError),
663
664 UnexpectedStep {
666 actual: Step,
667 expected: Step,
668 },
669
670 ReplayFailure {
672 error: String,
673 },
674
675 NotSupported {
677 error: String,
678 },
679
680 IrParseError {
682 error: String,
683 },
684
685 CliArgsError {
687 error: String,
688 },
689
690 Config {
692 config_error: Box<ConfigError>,
693 },
694
695 UnknownInputEndpoint {
697 endpoint_name: String,
698 },
699
700 PreprocessorCreateError {
702 endpoint_name: String,
703 error: String,
704 },
705
706 PostprocessorCreateError {
708 endpoint_name: String,
709 error: String,
710 },
711
712 UnknownOutputEndpoint {
714 endpoint_name: String,
715 },
716
717 ParseError {
723 endpoint_name: String,
724 error: Box<ParseError>,
725 },
726
727 #[serde(serialize_with = "serialize_encode_error")]
733 EncodeError {
734 endpoint_name: String,
735 error: AnyError,
736 },
737
738 #[serde(serialize_with = "serialize_input_transport_error")]
740 InputTransportError {
741 endpoint_name: String,
742 fatal: bool,
743 error: AnyError,
744 },
745
746 #[serde(serialize_with = "serialize_output_transport_error")]
748 OutputTransportError {
749 endpoint_name: String,
750 fatal: bool,
751 error: AnyError,
752 },
753
754 CommandError {
755 endpoint_name: String,
756 error: String,
757 },
758
759 DbspError {
761 error: DbspError,
762 },
763
764 PrometheusError {
766 error: String,
767 },
768
769 DbspPanic,
772
773 ControllerPanic,
775
776 ControllerExit,
778
779 #[serde(serialize_with = "serialize_storage_error")]
781 StorageError {
782 context: String,
784 error: StorageError,
785 backtrace: Box<Backtrace>,
786 },
787
788 EnterpriseFeature(&'static str),
790
791 SuspendError(SuspendError),
793
794 UnexpectedJsonStructure {
796 reason: String,
797 },
798
799 PipelineRestarted {
801 error: String,
802 },
803
804 UnknownEndpointInCompletionToken {
807 endpoint_id: u64,
808 },
809
810 CheckpointFetchError {
812 error: String,
813 },
814
815 CheckpointPushError {
817 error: String,
818 },
819
820 TransactionInProgress,
821 NoTransactionInProgress,
822
823 InvalidInitialStatus(RuntimeDesiredStatus),
825
826 InvalidStandby(&'static str),
828
829 InvalidStartupTransition {
831 from: RuntimeDesiredStatus,
832 to: RuntimeDesiredStatus,
833 },
834
835 BootstrapRejectedByUser,
836
837 BootstrapNotAllowed {
838 error: String,
839 },
840
841 UnexpectedBootstrap {
842 bootstrap_info: Option<BootstrapInfo>,
843 },
844
845 UnexpectedRuntimeVersion {
847 error: String,
848 },
849}
850
851impl ResponseError for ControllerError {
852 fn status_code(&self) -> StatusCode {
853 match self {
854 Self::Config { config_error }
855 if matches!(**config_error, ConfigError::UnknownInputStream { .. }) =>
856 {
857 StatusCode::NOT_FOUND
858 }
859 Self::Config { config_error }
860 if matches!(**config_error, ConfigError::UnknownOutputStream { .. }) =>
861 {
862 StatusCode::NOT_FOUND
863 }
864 Self::Config { .. } => StatusCode::BAD_REQUEST,
865 Self::UnknownInputEndpoint { .. } => StatusCode::NOT_FOUND,
866 Self::UnknownOutputEndpoint { .. } => StatusCode::NOT_FOUND,
867 Self::ParseError { .. } => StatusCode::BAD_REQUEST,
868 Self::NotSupported { .. } => StatusCode::BAD_REQUEST,
869 Self::EnterpriseFeature(_) => StatusCode::NOT_IMPLEMENTED,
870 Self::RestoreInProgress => StatusCode::SERVICE_UNAVAILABLE,
871 Self::BootstrapInProgress => StatusCode::SERVICE_UNAVAILABLE,
872 Self::PipelineRestarted { .. } => StatusCode::GONE,
873 Self::UnknownEndpointInCompletionToken { .. } => StatusCode::GONE,
874 Self::TransactionInProgress => StatusCode::CONFLICT,
875 Self::NoTransactionInProgress => StatusCode::BAD_REQUEST,
876 Self::InvalidInitialStatus(_) => StatusCode::GONE,
877 Self::BootstrapRejectedByUser => StatusCode::CONFLICT,
878 Self::UnexpectedBootstrap { .. } => StatusCode::INTERNAL_SERVER_ERROR,
879 Self::UnexpectedRuntimeVersion { .. } => StatusCode::INTERNAL_SERVER_ERROR,
880 _ => StatusCode::INTERNAL_SERVER_ERROR,
881 }
882 }
883
884 fn error_response(&self) -> HttpResponse<BoxBody> {
885 HttpResponseBuilder::new(self.status_code()).json(ErrorResponse::from_error(self))
886 }
887}
888
889fn serialize_io_error<S>(
890 context: &String,
891 io_error: &IoError,
892 backtrace: &Backtrace,
893 serializer: S,
894) -> Result<S::Ok, S::Error>
895where
896 S: Serializer,
897{
898 let mut ser = serializer.serialize_struct("IoError", 4)?;
899 ser.serialize_field("context", context)?;
900 ser.serialize_field("kind", &io_error.kind().to_string())?;
901 ser.serialize_field("os_error", &io_error.raw_os_error())?;
902 ser.serialize_field("backtrace", &backtrace.to_string())?;
903 ser.end()
904}
905
906fn serialize_storage_error<S>(
907 context: &String,
908 error: &StorageError,
909 backtrace: &Backtrace,
910 serializer: S,
911) -> Result<S::Ok, S::Error>
912where
913 S: Serializer,
914{
915 let mut ser = serializer.serialize_struct("StorageError", 4)?;
916 ser.serialize_field("context", context)?;
917 ser.serialize_field("error", &error.to_string())?;
918 ser.serialize_field("backtrace", &backtrace.to_string())?;
919 ser.end()
920}
921
922fn serialize_encode_error<S>(
923 endpoint: &String,
924 error: &AnyError,
925 serializer: S,
926) -> Result<S::Ok, S::Error>
927where
928 S: Serializer,
929{
930 let mut ser = serializer.serialize_struct("EncodeError", 3)?;
931 ser.serialize_field("endpoint_name", endpoint)?;
932 ser.serialize_field("error", &error.to_string())?;
933 ser.serialize_field("backtrace", &error.backtrace().to_string())?;
934 ser.end()
935}
936
937fn serialize_input_transport_error<S>(
938 endpoint: &String,
939 fatal: &bool,
940 error: &AnyError,
941 serializer: S,
942) -> Result<S::Ok, S::Error>
943where
944 S: Serializer,
945{
946 let mut ser = serializer.serialize_struct("InputTransportError", 4)?;
947 ser.serialize_field("endpoint_name", endpoint)?;
948 ser.serialize_field("fatal", fatal)?;
949 ser.serialize_field("error", &error.to_string())?;
950 ser.serialize_field("backtrace", &error.backtrace().to_string())?;
951 ser.end()
952}
953
954fn serialize_output_transport_error<S>(
955 endpoint: &String,
956 fatal: &bool,
957 error: &AnyError,
958 serializer: S,
959) -> Result<S::Ok, S::Error>
960where
961 S: Serializer,
962{
963 let mut ser = serializer.serialize_struct("OutputTransportError", 4)?;
964 ser.serialize_field("endpoint_name", endpoint)?;
965 ser.serialize_field("fatal", fatal)?;
966 ser.serialize_field("error", &error.to_string())?;
967 ser.serialize_field("backtrace", &error.backtrace().to_string())?;
968 ser.end()
969}
970
971impl DbspDetailedError for ControllerError {
972 fn error_code(&self) -> Cow<'static, str> {
974 match self {
975 Self::PreprocessorCreateError { .. } => Cow::from("PreprocessorCreateError"),
976 Self::PostprocessorCreateError { .. } => Cow::from("PostprocessorCreateError"),
977 Self::IoError { .. } => Cow::from("ControllerIoError"),
978 Self::NotSupported { .. } => Cow::from("NotSupported"),
979 Self::SchemaParseError { .. } => Cow::from("SchemaParseError"),
980 Self::SchemaValidationError { .. } => Cow::from("SchemaParseError"),
981 Self::CheckpointParseError { .. } => Cow::from("CheckpointParseError"),
982 Self::CheckpointDoesNotMatchPipeline => Cow::from("CheckpointDoesNotMatchPipeline"),
983 Self::RestoreInProgress => Cow::from("RestoreInProgress"),
984 Self::BootstrapInProgress => Cow::from("BootstrapInProgress"),
985 Self::StepError { .. } => Cow::from("StepError"),
986 Self::UnexpectedStep { .. } => Cow::from("UnexpectedStep"),
987 Self::ReplayFailure { .. } => Cow::from("ReplayFailure"),
988 Self::IrParseError { .. } => Cow::from("IrParseError"),
989 Self::CliArgsError { .. } => Cow::from("ControllerCliArgsError"),
990 Self::Config { config_error } => {
991 Cow::from(format!("ConfigError.{}", config_error.error_code()))
992 }
993 Self::UnknownInputEndpoint { .. } => Cow::from("UnknownInputEndpoint"),
994 Self::UnknownOutputEndpoint { .. } => Cow::from("UnknownOutputEndpoint"),
995 Self::ParseError { .. } => Cow::from("ParseError"),
996 Self::EncodeError { .. } => Cow::from("EncodeError"),
997 Self::InputTransportError { .. } => Cow::from("InputTransportError"),
998 Self::OutputTransportError { .. } => Cow::from("OutputTransportError"),
999 Self::CommandError { .. } => Cow::from("CommandError"),
1000 Self::PrometheusError { .. } => Cow::from("PrometheusError"),
1001 Self::DbspError { error } => error.error_code(),
1002 Self::DbspPanic => Cow::from("DbspPanic"),
1003 Self::ControllerPanic => Cow::from("ControllerPanic"),
1004 Self::ControllerExit => Cow::from("ControllerExit"),
1005 Self::EnterpriseFeature(_) => Cow::from("EnterpriseFeature"),
1006 Self::StorageError { .. } => Cow::from("StorageError"),
1007 Self::SuspendError(_) => Cow::from("SuspendError"),
1008 Self::UnexpectedJsonStructure { .. } => Cow::from("UnexpectedJsonStructure"),
1009 Self::PipelineRestarted { .. } => Cow::from("PipelineRestarted"),
1010 Self::UnknownEndpointInCompletionToken { .. } => {
1011 Cow::from("UnknownEndpointInCompletionToken")
1012 }
1013 Self::CheckpointFetchError { .. } => Cow::from("CheckpointFetchError"),
1014 Self::CheckpointPushError { .. } => Cow::from("CheckpointPushError"),
1015 Self::TransactionInProgress => Cow::from("TransactionInProgress"),
1016 Self::NoTransactionInProgress => Cow::from("NoTransactionInProgress"),
1017 Self::InvalidInitialStatus(_) => Cow::from("InvalidInitialStatus"),
1018 Self::InvalidStandby(_) => Cow::from("InvalidStandby"),
1019 Self::InvalidStartupTransition { .. } => Cow::from("InvalidStartupTransition"),
1020 Self::BootstrapRejectedByUser => Cow::from("BootstrapRejected"),
1021 Self::BootstrapNotAllowed { .. } => Cow::from("BootstrapNotAllowed"),
1022 Self::UnexpectedBootstrap { .. } => Cow::from("UnexpectedBootstrap"),
1023 Self::UnexpectedRuntimeVersion { .. } => Cow::from("UnexpectedRuntimeVersion"),
1024 }
1025 }
1026}
1027
1028impl DetailedError for ControllerError {
1029 fn error_code(&self) -> Cow<'static, str> {
1031 DbspDetailedError::error_code(self)
1032 }
1033}
1034
1035impl StdError for ControllerError {}
1036
1037impl Display for ControllerError {
1038 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
1039 match self {
1040 Self::PreprocessorCreateError {
1041 endpoint_name,
1042 error,
1043 } => {
1044 write!(
1045 f,
1046 "Error creating preprocessor for endpoint {endpoint_name}: {error}"
1047 )
1048 }
1049 Self::PostprocessorCreateError {
1050 endpoint_name,
1051 error,
1052 } => {
1053 write!(
1054 f,
1055 "Error creating postprocessor for endpoint {endpoint_name}: {error}"
1056 )
1057 }
1058 Self::IoError {
1059 context, io_error, ..
1060 } => {
1061 write!(f, "I/O error {context}: {io_error}")
1062 }
1063 Self::NotSupported { error } => {
1064 write!(f, "Not supported: {error}")
1065 }
1066 Self::SchemaParseError { error } => {
1067 write!(f, "Error parsing program schema: {error}")
1068 }
1069 Self::SchemaValidationError { error } => {
1070 write!(f, "Error validating program schema: {error}")
1071 }
1072 Self::CheckpointParseError { error } => {
1073 write!(f, "Error parsing checkpoint file: {error}")
1074 }
1075 Self::CheckpointDoesNotMatchPipeline => {
1076 write!(
1077 f,
1078 "Recovery failed: the pipeline has been recovered from a checkpoint, but the checkpoint does not match the current pipeline definition. This can be caused by a corrupted checkpoint or an internal error."
1079 )
1080 }
1081 Self::RestoreInProgress => {
1082 write!(
1083 f,
1084 "Operation cannot be initiated now because the pipeline is restoring from a checkpoint."
1085 )
1086 }
1087 Self::BootstrapInProgress => {
1088 write!(
1089 f,
1090 "Operation cannot be initiated while the pipeline is bootstrapping."
1091 )
1092 }
1093 Self::StepError(error) => write!(f, "Error with persistent input steps: {error}"),
1094 Self::UnexpectedStep { actual, expected } => {
1095 write!(f, "Read step {actual}, expected {expected}")
1096 }
1097 Self::ReplayFailure { error } => {
1098 write!(f, "{error}")
1099 }
1100 Self::IrParseError { error } => {
1101 write!(f, "Error parsing program IR: {error}")
1102 }
1103 Self::CliArgsError { error } => {
1104 write!(f, "Error parsing command line arguments: {error}")
1105 }
1106 Self::Config { config_error } => {
1107 write!(f, "invalid controller configuration: {config_error}")
1108 }
1109 Self::UnknownInputEndpoint { endpoint_name } => {
1110 write!(f, "unknown input endpoint name '{endpoint_name}'")
1111 }
1112 Self::UnknownOutputEndpoint { endpoint_name } => {
1113 write!(f, "unknown output endpoint name '{endpoint_name}'")
1114 }
1115 Self::InputTransportError {
1116 endpoint_name,
1117 fatal,
1118 error,
1119 } => {
1120 write!(
1121 f,
1122 "{}error on input endpoint '{endpoint_name}': {}",
1123 if *fatal { "FATAL " } else { "" },
1124 error.root_cause()
1125 )
1126 }
1127 Self::OutputTransportError {
1128 endpoint_name,
1129 fatal,
1130 error,
1131 } => {
1132 write!(
1133 f,
1134 "{}error on output endpoint '{endpoint_name}': {}",
1135 if *fatal { "FATAL " } else { "" },
1136 error.root_cause()
1137 )
1138 }
1139 Self::CommandError {
1140 endpoint_name,
1141 error,
1142 } => {
1143 write!(
1144 f,
1145 "error executing command on output endpoint '{endpoint_name}': {error}"
1146 )
1147 }
1148 Self::ParseError {
1149 endpoint_name,
1150 error,
1151 } => {
1152 write!(
1153 f,
1154 "parse error on input endpoint '{endpoint_name}': {error}"
1155 )
1156 }
1157 Self::EncodeError {
1158 endpoint_name,
1159 error,
1160 } => {
1161 write!(
1162 f,
1163 "encoder error on output endpoint '{endpoint_name}': {error}"
1164 )
1165 }
1166 Self::PrometheusError { error } => {
1167 write!(f, "Error in the Prometheus metrics module: '{error}'")
1168 }
1169 Self::DbspError { error } => {
1170 write!(f, "DBSP error: {error}")
1171 }
1172 Self::DbspPanic => {
1173 write!(f, "Panic inside the DBSP runtime")
1174 }
1175 Self::ControllerPanic => {
1176 write!(f, "Panic inside the DBSP controller")
1177 }
1178 Self::ControllerExit => {
1179 write!(f, "Controller exited before command could be executed")
1180 }
1181 Self::EnterpriseFeature(feature) => {
1182 write!(
1183 f,
1184 "Cannot use enterprise-only feature ({feature}) in Feldera community edition."
1185 )
1186 }
1187 Self::StorageError { context, error, .. } => {
1188 write!(f, "I/O error {context}: {error}")
1189 }
1190 Self::SuspendError(error) => write!(f, "{error}"),
1191 Self::UnexpectedJsonStructure { reason } => {
1192 write!(f, "An unexpected JSON structure was detected: {reason}")
1193 }
1194 Self::PipelineRestarted { error } => {
1195 write!(f, "{error}")
1196 }
1197 Self::UnknownEndpointInCompletionToken { endpoint_id } => {
1198 write!(
1199 f,
1200 "completion token specifies input endpoint id {endpoint_id}, which doesn't exist; this indicates that the input connector was deleted after the completion token was generated"
1201 )
1202 }
1203 Self::CheckpointFetchError { error } => {
1204 write!(f, "Error fetching checkpoint from object store: {error}")
1205 }
1206 Self::CheckpointPushError { error } => {
1207 write!(f, "Error pushing checkpoint to object store: {error}")
1208 }
1209 Self::TransactionInProgress => {
1210 write!(
1211 f,
1212 "Cannot perform this operation while there is a transaction in progress"
1213 )
1214 }
1215 Self::NoTransactionInProgress => {
1216 write!(
1217 f,
1218 "This operation requires an active transaction, but none is currently in progress"
1219 )
1220 }
1221 Self::InvalidInitialStatus(status) => {
1222 write!(
1223 f,
1224 "Invalid initial status {status:?} provided on command line or read from storage (only coordination, running, paused, and standby are valid)"
1225 )
1226 }
1227 Self::InvalidStandby(standby) => {
1228 write!(f, "Cannot enter standby mode: {standby}")
1229 }
1230 Self::InvalidStartupTransition { from, to } => {
1231 write!(
1232 f,
1233 "Invalid pipeline startup transition from {from:?} to {to:?}"
1234 )
1235 }
1236 Self::BootstrapRejectedByUser => {
1237 write!(
1238 f,
1239 "Bootstrapping of the modified pipeline was rejected by the user."
1240 )
1241 }
1242 Self::BootstrapNotAllowed { error } => {
1243 write!(
1244 f,
1245 "The pipeline has been modified since the last checkpoint; however it cannot be bootstrapped due to the following reasons:\n{error}"
1246 )
1247 }
1248 Self::UnexpectedBootstrap { bootstrap_info } => {
1249 write!(
1250 f,
1251 "The pipeline's query plan was modified since the last checkpoint and requires bootstrapping; however we were not able to detect the changes in the pipeline metadata. This is a bug; please report to the developers. Bootstrap info: {bootstrap_info:?}"
1252 )
1253 }
1254 Self::UnexpectedRuntimeVersion { error } => {
1255 write!(f, "{error}")
1256 }
1257 }
1258 }
1259}
1260
1261impl ControllerError {
1262 pub fn io_error(context: impl Display, io_error: IoError) -> Self {
1263 Self::IoError {
1264 context: context.to_string(),
1265 io_error,
1266 backtrace: Backtrace::capture(),
1267 }
1268 }
1269
1270 pub fn not_supported(error: &str) -> Self {
1271 Self::NotSupported {
1272 error: error.to_string(),
1273 }
1274 }
1275
1276 pub fn schema_parse_error(error: &str) -> Self {
1277 Self::SchemaParseError {
1278 error: error.to_string(),
1279 }
1280 }
1281
1282 pub fn checkpoint_does_not_match_pipeline() -> Self {
1283 Self::CheckpointDoesNotMatchPipeline
1284 }
1285
1286 pub fn checkpoint_fetch_error(error: String) -> Self {
1287 Self::CheckpointFetchError { error }
1288 }
1289
1290 pub fn checkpoint_push_error(error: String) -> Self {
1291 Self::CheckpointPushError { error }
1292 }
1293
1294 pub fn schema_validation_error(error: &str) -> Self {
1295 Self::SchemaValidationError {
1296 error: error.to_string(),
1297 }
1298 }
1299
1300 pub fn ir_parse_error(error: &str) -> Self {
1301 Self::IrParseError {
1302 error: error.to_string(),
1303 }
1304 }
1305
1306 pub fn cli_args_error<E>(error: &E) -> Self
1307 where
1308 E: ToString,
1309 {
1310 Self::CliArgsError {
1311 error: error.to_string(),
1312 }
1313 }
1314
1315 pub fn unknown_input_endpoint(endpoint_name: &str) -> Self {
1316 Self::UnknownInputEndpoint {
1317 endpoint_name: endpoint_name.to_string(),
1318 }
1319 }
1320
1321 pub fn unknown_output_endpoint(endpoint_name: &str) -> Self {
1322 Self::UnknownOutputEndpoint {
1323 endpoint_name: endpoint_name.to_string(),
1324 }
1325 }
1326
1327 pub fn pipeline_config_parse_error<E>(error: &E) -> Self
1328 where
1329 E: ToString,
1330 {
1331 Self::Config {
1332 config_error: Box::new(ConfigError::pipeline_config_parse_error(error)),
1333 }
1334 }
1335
1336 pub fn parser_config_parse_error<E>(endpoint_name: &str, error: &E, config: &str) -> Self
1337 where
1338 E: ToString,
1339 {
1340 Self::Config {
1341 config_error: Box::new(ConfigError::parser_config_parse_error(
1342 endpoint_name,
1343 error,
1344 config,
1345 )),
1346 }
1347 }
1348
1349 pub fn encoder_config_parse_error<E>(endpoint_name: &str, error: &E, config: &str) -> Self
1350 where
1351 E: ToString,
1352 {
1353 Self::Config {
1354 config_error: Box::new(ConfigError::encoder_config_parse_error(
1355 endpoint_name,
1356 error,
1357 config,
1358 )),
1359 }
1360 }
1361
1362 pub fn duplicate_input_endpoint(endpoint_name: &str) -> Self {
1363 Self::Config {
1364 config_error: Box::new(ConfigError::duplicate_input_endpoint(endpoint_name)),
1365 }
1366 }
1367
1368 pub fn duplicate_input_stream(stream_name: &str) -> Self {
1369 Self::Config {
1370 config_error: Box::new(ConfigError::duplicate_input_stream(stream_name)),
1371 }
1372 }
1373
1374 pub fn unknown_input_format(endpoint_name: &str, format_name: &str) -> Self {
1375 Self::Config {
1376 config_error: Box::new(ConfigError::unknown_input_format(
1377 endpoint_name,
1378 format_name,
1379 )),
1380 }
1381 }
1382
1383 pub fn unknown_input_transport(endpoint_name: &str, transport_name: &str) -> Self {
1384 Self::Config {
1385 config_error: Box::new(ConfigError::unknown_input_transport(
1386 endpoint_name,
1387 transport_name,
1388 )),
1389 }
1390 }
1391
1392 pub fn duplicate_output_endpoint(endpoint_name: &str) -> Self {
1393 Self::Config {
1394 config_error: Box::new(ConfigError::duplicate_output_endpoint(endpoint_name)),
1395 }
1396 }
1397
1398 pub fn duplicate_output_stream(stream_name: &str) -> Self {
1399 Self::Config {
1400 config_error: Box::new(ConfigError::duplicate_output_stream(stream_name)),
1401 }
1402 }
1403
1404 pub fn unknown_output_format(endpoint_name: &str, format_name: &str) -> Self {
1405 Self::Config {
1406 config_error: Box::new(ConfigError::unknown_output_format(
1407 endpoint_name,
1408 format_name,
1409 )),
1410 }
1411 }
1412
1413 pub fn unknown_output_transport(endpoint_name: &str, transport_name: &str) -> Self {
1414 Self::Config {
1415 config_error: Box::new(ConfigError::unknown_output_transport(
1416 endpoint_name,
1417 transport_name,
1418 )),
1419 }
1420 }
1421
1422 pub fn unknown_input_stream(endpoint_name: &str, stream_name: &str) -> Self {
1423 Self::Config {
1424 config_error: Box::new(ConfigError::unknown_input_stream(
1425 endpoint_name,
1426 stream_name,
1427 )),
1428 }
1429 }
1430
1431 pub fn unknown_output_stream(endpoint_name: &str, stream_name: &str) -> Self {
1432 Self::Config {
1433 config_error: Box::new(ConfigError::unknown_output_stream(
1434 endpoint_name,
1435 stream_name,
1436 )),
1437 }
1438 }
1439
1440 pub fn unknown_index(endpoint_name: &str, index_name: &str) -> Self {
1441 Self::Config {
1442 config_error: Box::new(ConfigError::unknown_index(endpoint_name, index_name)),
1443 }
1444 }
1445
1446 pub fn not_an_index(endpoint_name: &str, index_name: &str) -> Self {
1447 Self::Config {
1448 config_error: Box::new(ConfigError::not_an_index(endpoint_name, index_name)),
1449 }
1450 }
1451
1452 pub fn input_format_not_supported(endpoint_name: &str, error: &str) -> Self {
1453 Self::Config {
1454 config_error: Box::new(ConfigError::input_format_not_supported(
1455 endpoint_name,
1456 error,
1457 )),
1458 }
1459 }
1460
1461 pub fn output_format_not_supported(endpoint_name: &str, error: &str) -> Self {
1462 Self::Config {
1463 config_error: Box::new(ConfigError::output_format_not_supported(
1464 endpoint_name,
1465 error,
1466 )),
1467 }
1468 }
1469
1470 pub fn input_format_not_specified(endpoint_name: &str) -> Self {
1471 Self::Config {
1472 config_error: Box::new(ConfigError::input_format_not_specified(endpoint_name)),
1473 }
1474 }
1475
1476 pub fn output_format_not_specified(endpoint_name: &str) -> Self {
1477 Self::Config {
1478 config_error: Box::new(ConfigError::output_format_not_specified(endpoint_name)),
1479 }
1480 }
1481
1482 pub fn invalid_encoder_configuration(endpoint_name: &str, error: &str) -> Self {
1483 Self::Config {
1484 config_error: Box::new(ConfigError::invalid_encoder_configuration(
1485 endpoint_name,
1486 error,
1487 )),
1488 }
1489 }
1490
1491 pub fn invalid_parser_configuration(endpoint_name: &str, error: &str) -> Self {
1492 Self::Config {
1493 config_error: Box::new(ConfigError::invalid_parser_configuration(
1494 endpoint_name,
1495 error,
1496 )),
1497 }
1498 }
1499
1500 pub fn invalid_transport_configuration(endpoint_name: &str, error: &str) -> Self {
1501 Self::Config {
1502 config_error: Box::new(ConfigError::invalid_transport_configuration(
1503 endpoint_name,
1504 error,
1505 )),
1506 }
1507 }
1508
1509 pub fn invalid_output_buffer_configuration(endpoint_name: &str, error: &str) -> Self {
1510 Self::Config {
1511 config_error: Box::new(ConfigError::invalid_output_buffer_configuration(
1512 endpoint_name,
1513 error,
1514 )),
1515 }
1516 }
1517
1518 pub fn input_transport_error(endpoint_name: &str, fatal: bool, error: AnyError) -> Self {
1519 Self::InputTransportError {
1520 endpoint_name: endpoint_name.to_owned(),
1521 fatal,
1522 error,
1523 }
1524 }
1525
1526 pub fn output_transport_error(endpoint_name: &str, fatal: bool, error: AnyError) -> Self {
1527 Self::OutputTransportError {
1528 endpoint_name: endpoint_name.to_owned(),
1529 fatal,
1530 error,
1531 }
1532 }
1533
1534 pub fn command_error(endpoint_name: &str, error: &str) -> Self {
1535 Self::CommandError {
1536 endpoint_name: endpoint_name.to_owned(),
1537 error: error.to_string(),
1538 }
1539 }
1540
1541 pub fn parse_error(endpoint_name: &str, error: ParseError) -> Self {
1542 Self::ParseError {
1543 endpoint_name: endpoint_name.to_owned(),
1544 error: Box::new(error),
1545 }
1546 }
1547
1548 pub fn encode_error(endpoint_name: &str, error: AnyError) -> Self {
1549 Self::EncodeError {
1550 endpoint_name: endpoint_name.to_owned(),
1551 error,
1552 }
1553 }
1554
1555 pub fn prometheus_error<E>(error: &E) -> Self
1556 where
1557 E: ToString,
1558 {
1559 Self::PrometheusError {
1560 error: error.to_string(),
1561 }
1562 }
1563
1564 pub fn dbsp_error(error: DbspError) -> Self {
1565 Self::DbspError { error }
1566 }
1567
1568 pub fn dbsp_panic() -> Self {
1569 Self::DbspPanic
1570 }
1571
1572 pub fn controller_panic() -> Self {
1573 Self::ControllerPanic
1574 }
1575
1576 pub fn storage_error(context: impl Display, error: StorageError) -> Self {
1577 Self::StorageError {
1578 context: context.to_string(),
1579 error,
1580 backtrace: Box::new(Backtrace::capture()),
1581 }
1582 }
1583
1584 pub fn pipeline_restarted(error: &str) -> Self {
1585 Self::PipelineRestarted {
1586 error: error.to_string(),
1587 }
1588 }
1589
1590 pub fn unknown_endpoint_in_completion_token(endpoint_id: u64) -> Self {
1591 Self::UnknownEndpointInCompletionToken { endpoint_id }
1592 }
1593
1594 pub fn kind(&self) -> ErrorKind {
1595 match self {
1596 Self::IoError { io_error, .. } => io_error.kind(),
1597 Self::StorageError { error, .. } => error.kind(),
1598 Self::StepError(error) => error.kind(),
1599 Self::RestoreInProgress
1600 | Self::BootstrapInProgress
1601 | Self::SuspendError(SuspendError::Temporary(_))
1602 | Self::TransactionInProgress => ErrorKind::ResourceBusy,
1603 Self::NotSupported { .. } | Self::SuspendError(SuspendError::Permanent(_)) => {
1604 ErrorKind::Unsupported
1605 }
1606 Self::DbspError {
1607 error: DbspError::IO(error),
1608 } => error.kind(),
1609 Self::DbspError { .. }
1610 | Self::SchemaParseError { .. }
1611 | Self::SchemaValidationError { .. }
1612 | Self::CheckpointParseError { .. }
1613 | Self::CheckpointDoesNotMatchPipeline
1614 | Self::UnexpectedStep { .. }
1615 | Self::ReplayFailure { .. }
1616 | Self::IrParseError { .. }
1617 | Self::CliArgsError { .. }
1618 | Self::Config { .. }
1619 | Self::UnknownInputEndpoint { .. }
1620 | Self::UnknownOutputEndpoint { .. }
1621 | Self::ParseError { .. }
1622 | Self::EncodeError { .. }
1623 | Self::InputTransportError { .. }
1624 | Self::OutputTransportError { .. }
1625 | Self::CommandError { .. }
1626 | Self::PrometheusError { .. }
1627 | Self::DbspPanic
1628 | Self::ControllerPanic
1629 | Self::ControllerExit
1630 | Self::EnterpriseFeature(_)
1631 | Self::UnexpectedJsonStructure { .. }
1632 | Self::UnknownEndpointInCompletionToken { .. }
1633 | Self::CheckpointFetchError { .. }
1634 | Self::CheckpointPushError { .. }
1635 | Self::PipelineRestarted { .. }
1636 | Self::NoTransactionInProgress
1637 | Self::InvalidInitialStatus(_)
1638 | Self::BootstrapRejectedByUser
1639 | Self::BootstrapNotAllowed { .. }
1640 | Self::UnexpectedBootstrap { .. }
1641 | Self::InvalidStandby(_)
1642 | Self::InvalidStartupTransition { .. }
1643 | Self::PreprocessorCreateError { .. }
1644 | Self::PostprocessorCreateError { .. }
1645 | Self::UnexpectedRuntimeVersion { .. } => ErrorKind::Other,
1646 }
1647 }
1648}
1649
1650impl From<ConfigError> for ControllerError {
1651 fn from(config_error: ConfigError) -> Self {
1652 Self::Config {
1653 config_error: Box::new(config_error),
1654 }
1655 }
1656}
1657
1658impl From<DbspError> for ControllerError {
1659 fn from(error: DbspError) -> Self {
1660 Self::DbspError { error }
1661 }
1662}
1663
1664impl From<SuspendError> for ControllerError {
1665 fn from(error: SuspendError) -> Self {
1666 Self::SuspendError(error)
1667 }
1668}