dynamo_runtime/pipeline/
error.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Pipeline Error
5//
6use async_nats::error::Error as NatsError;
7
8pub use anyhow::{Context, Error, Result, anyhow, anyhow as error, bail, ensure};
9
10pub trait PipelineErrorExt {
11    /// Downcast the [`Error`] to a [`PipelineError`]
12    fn try_into_pipeline_error(self) -> Result<PipelineError, Error>;
13
14    /// If the [`Error`] can be downcast to a [`PipelineError`], then the left variant is returned,
15    /// otherwise the right variant is returned.
16    fn either_pipeline_error(self) -> either::Either<PipelineError, Error>;
17}
18
19impl PipelineErrorExt for Error {
20    fn try_into_pipeline_error(self) -> Result<PipelineError, Error> {
21        self.downcast::<PipelineError>()
22    }
23
24    fn either_pipeline_error(self) -> either::Either<PipelineError, Error> {
25        match self.downcast::<PipelineError>() {
26            Ok(err) => either::Left(err),
27            Err(err) => either::Right(err),
28        }
29    }
30}
31
32#[derive(Debug, thiserror::Error)]
33pub enum PipelineError {
34    /// For starter, to remove as code matures.
35    #[error("Generic error: {0}")]
36    Generic(String),
37
38    /// Edges can only be set once. This error is thrown on subsequent attempts to set an edge.s
39    #[error("Link failed: Edge already set")]
40    EdgeAlreadySet,
41
42    /// The source node is not connected to an edge.
43    #[error("Disconnected source; no edge on which to send data")]
44    NoEdge,
45
46    #[error("SegmentSink is not connected to an EgressPort")]
47    NoNetworkEdge,
48
49    /// In the interim between when a request was made and when the stream was received, the
50    /// requesting task was dropped. This maybe a logic error in the pipeline; and become a
51    /// panic/fatal error in the future. This error is thrown when the `on_data` method of a
52    /// terminating sink either cannot find the `oneshot` channel sender or the corresponding
53    /// receiver was dropped
54    #[error("Unlinked request; initiating request task was dropped or cancelled")]
55    DetachedStreamReceiver,
56
57    // In the interim between when a response was made and when the stream was received, the
58    // Sender for the stream was dropped. This maybe a logic error in the pipeline; and become a
59    // panic/fatal error in the future.
60    #[error("Unlinked response; response task was dropped or cancelled")]
61    DetachedStreamSender,
62
63    #[error("Serialzation Error: {0}")]
64    SerializationError(String),
65
66    #[error("Deserialization Error: {0}")]
67    DeserializationError(String),
68
69    #[error("Failed to issue request to the control plane: {0}")]
70    ControlPlaneRequestError(String),
71
72    #[error("Failed to establish a streaming connection: {0}")]
73    ConnectionFailed(String),
74
75    #[error("Generate Error: {0}")]
76    GenerateError(Error),
77
78    #[error("An endpoint URL must have the format: namespace/component/endpoint")]
79    InvalidEndpointFormat,
80
81    #[error("NATS Request Error: {0}")]
82    NatsRequestError(#[from] NatsError<async_nats::jetstream::context::RequestErrorKind>),
83
84    #[error("NATS Get Stream Error: {0}")]
85    NatsGetStreamError(#[from] NatsError<async_nats::jetstream::context::GetStreamErrorKind>),
86
87    #[error("NATS Create Stream Error: {0}")]
88    NatsCreateStreamError(#[from] NatsError<async_nats::jetstream::context::CreateStreamErrorKind>),
89
90    #[error("NATS Consumer Error: {0}")]
91    NatsConsumerError(#[from] NatsError<async_nats::jetstream::stream::ConsumerErrorKind>),
92
93    #[error("NATS Batch Error: {0}")]
94    NatsBatchError(#[from] NatsError<async_nats::jetstream::consumer::pull::BatchErrorKind>),
95
96    #[error("NATS Publish Error: {0}")]
97    NatsPublishError(#[from] NatsError<async_nats::client::PublishErrorKind>),
98
99    #[error("NATS Connect Error: {0}")]
100    NatsConnectError(#[from] NatsError<async_nats::ConnectErrorKind>),
101
102    #[error("NATS Subscriber Error: {0}")]
103    NatsSubscriberError(#[from] async_nats::SubscribeError),
104
105    #[error("Local IP Address Error: {0}")]
106    LocalIpAddressError(#[from] local_ip_address::Error),
107
108    #[error("Prometheus Error: {0}")]
109    PrometheusError(#[from] prometheus::Error),
110
111    #[error("Other NATS Error: {0}")]
112    NatsError(#[from] Box<dyn std::error::Error + Send + Sync>),
113
114    #[error("Two Part Codec Error: {0}")]
115    TwoPartCodec(#[from] TwoPartCodecError),
116
117    #[error("Serde Json Error: {0}")]
118    SerdeJsonError(#[from] serde_json::Error),
119
120    #[error("NATS KV Err: {0} for bucket '{1}")]
121    KeyValueError(String, String),
122
123    /// All instances are busy and cannot handle new requests
124    #[error("Service temporarily unavailable: {0}")]
125    ServiceOverloaded(String),
126}
127
128#[derive(Debug, thiserror::Error)]
129pub enum TwoPartCodecError {
130    #[error("I/O error: {0}")]
131    Io(#[from] std::io::Error),
132
133    #[error("Message size {0} exceeds the maximum allowed size of {1} bytes")]
134    MessageTooLarge(usize, usize),
135
136    #[error("Invalid message: {0}")]
137    InvalidMessage(String),
138
139    #[error("Checksum mismatch")]
140    ChecksumMismatch,
141}