dynamo_runtime/pipeline/
error.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Pipeline Error
17//
18use async_nats::error::Error as NatsError;
19
20pub use anyhow::{anyhow, anyhow as error, bail, ensure, Context, Error, Result};
21
22pub trait PipelineErrorExt {
23    /// Downcast the [`Error`] to a [`PipelineError`]
24    fn try_into_pipeline_error(self) -> Result<PipelineError, Error>;
25
26    /// If the [`Error`] can be downcast to a [`PipelineError`], then the left variant is returned,
27    /// otherwise the right variant is returned.
28    fn either_pipeline_error(self) -> either::Either<PipelineError, Error>;
29}
30
31impl PipelineErrorExt for Error {
32    fn try_into_pipeline_error(self) -> Result<PipelineError, Error> {
33        self.downcast::<PipelineError>()
34    }
35
36    fn either_pipeline_error(self) -> either::Either<PipelineError, Error> {
37        match self.downcast::<PipelineError>() {
38            Ok(err) => either::Left(err),
39            Err(err) => either::Right(err),
40        }
41    }
42}
43
44#[derive(Debug, thiserror::Error)]
45pub enum PipelineError {
46    /// For starter, to remove as code matures.
47    #[error("Generic error: {0}")]
48    Generic(String),
49
50    /// Edges can only be set once. This error is thrown on subsequent attempts to set an edge.s
51    #[error("Link failed: Edge already set")]
52    EdgeAlreadySet,
53
54    /// The source node is not connected to an edge.
55    #[error("Disconnected source; no edge on which to send data")]
56    NoEdge,
57
58    #[error("SegmentSink is not connected to an EgressPort")]
59    NoNetworkEdge,
60
61    /// In the interim between when a request was made and when the stream was received, the
62    /// requesting task was dropped. This maybe a logic error in the pipeline; and become a
63    /// panic/fatal error in the future. This error is thrown when the `on_data` method of a
64    /// terminating sink either cannot find the `oneshot` channel sender or the corresponding
65    /// receiver was dropped
66    #[error("Unlinked request; initiating request task was dropped or cancelled")]
67    DetachedStreamReceiver,
68
69    // In the interim between when a response was made and when the stream was received, the
70    // Sender for the stream was dropped. This maybe a logic error in the pipeline; and become a
71    // panic/fatal error in the future.
72    #[error("Unlinked response; response task was dropped or cancelled")]
73    DetachedStreamSender,
74
75    #[error("Serialzation Error: {0}")]
76    SerializationError(String),
77
78    #[error("Deserialization Error: {0}")]
79    DeserializationError(String),
80
81    #[error("Failed to issue request to the control plane: {0}")]
82    ControlPlaneRequestError(String),
83
84    #[error("Failed to establish a streaming connection: {0}")]
85    ConnectionFailed(String),
86
87    #[error("Generate Error: {0}")]
88    GenerateError(Error),
89
90    #[error("An endpoint URL must have the format: namespace/component/endpoint")]
91    InvalidEndpointFormat,
92
93    #[error("NATS Request Error: {0}")]
94    NatsRequestError(#[from] NatsError<async_nats::jetstream::context::RequestErrorKind>),
95
96    #[error("NATS Get Stream Error: {0}")]
97    NatsGetStreamError(#[from] NatsError<async_nats::jetstream::context::GetStreamErrorKind>),
98
99    #[error("NATS Create Stream Error: {0}")]
100    NatsCreateStreamError(#[from] NatsError<async_nats::jetstream::context::CreateStreamErrorKind>),
101
102    #[error("NATS Consumer Error: {0}")]
103    NatsConsumerError(#[from] NatsError<async_nats::jetstream::stream::ConsumerErrorKind>),
104
105    #[error("NATS Batch Error: {0}")]
106    NatsBatchError(#[from] NatsError<async_nats::jetstream::consumer::pull::BatchErrorKind>),
107
108    #[error("NATS Publish Error: {0}")]
109    NatsPublishError(#[from] NatsError<async_nats::client::PublishErrorKind>),
110
111    #[error("NATS Connect Error: {0}")]
112    NatsConnectError(#[from] NatsError<async_nats::ConnectErrorKind>),
113
114    #[error("NATS Subscriber Error: {0}")]
115    NatsSubscriberError(#[from] async_nats::SubscribeError),
116
117    #[error("Local IP Address Error: {0}")]
118    LocalIpAddressError(#[from] local_ip_address::Error),
119
120    #[error("Prometheus Error: {0}")]
121    PrometheusError(#[from] prometheus::Error),
122
123    #[error("Other NATS Error: {0}")]
124    NatsError(#[from] Box<dyn std::error::Error + Send + Sync>),
125
126    #[error("Two Part Codec Error: {0}")]
127    TwoPartCodec(#[from] TwoPartCodecError),
128
129    #[error("Serde Json Error: {0}")]
130    SerdeJsonError(#[from] serde_json::Error),
131
132    #[error("NATS KV Err: {0} for bucket '{1}")]
133    KeyValueError(String, String),
134
135    /// All instances are busy and cannot handle new requests
136    #[error("Service temporarily unavailable: {0}")]
137    ServiceOverloaded(String),
138}
139
140#[derive(Debug, thiserror::Error)]
141pub enum TwoPartCodecError {
142    #[error("I/O error: {0}")]
143    Io(#[from] std::io::Error),
144
145    #[error("Message size {0} exceeds the maximum allowed size of {1} bytes")]
146    MessageTooLarge(usize, usize),
147
148    #[error("Invalid message: {0}")]
149    InvalidMessage(String),
150
151    #[error("Checksum mismatch")]
152    ChecksumMismatch,
153}