swimos_api/error/
mod.rs

1// Copyright 2015-2024 Swim Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! # SwimOS API error types
16
17use std::io;
18use std::{error::Error, sync::Arc};
19
20use swimos_form::read::ReadError;
21use swimos_model::Text;
22use swimos_recon::parser::AsyncParseError;
23use swimos_utilities::trigger::TriggerError;
24use swimos_utilities::{errors::Recoverable, routing::UnapplyError, trigger::promise};
25use thiserror::Error;
26use tokio::sync::{mpsc, oneshot, watch};
27
28use crate::agent::WarpLaneKind;
29use crate::{address::RelativeAddress, agent::StoreKind};
30
31mod introspection;
32
33pub use introspection::{IntrospectionStopped, LaneIntrospectionError, NodeIntrospectionError};
34
35/// Indicates that an agent or downlink failed to read a frame from a byte stream.
36#[derive(Error, Debug)]
37pub enum FrameIoError {
38    #[error("{0}")]
39    Io(#[from] std::io::Error),
40    #[error("{0}")]
41    BadFrame(#[from] InvalidFrame),
42    #[error("The stream terminated when a frame was expected.")]
43    InvalidTermination,
44}
45
46impl From<AsyncParseError> for FrameIoError {
47    fn from(e: AsyncParseError) -> Self {
48        FrameIoError::BadFrame(InvalidFrame::InvalidMessageBody(e))
49    }
50}
51
52/// Indicates that the content of a frame was invalid.
53#[derive(Error, Debug)]
54pub enum InvalidFrame {
55    #[error("An incoming frame was incomplete.")]
56    Incomplete,
57    #[error("Invalid frame header: {problem}")]
58    InvalidHeader { problem: Text },
59    #[error("Invalid frame body: {0}")]
60    InvalidMessageBody(#[from] AsyncParseError),
61}
62
63/// Possible failure modes for a downlink consumer.
64#[derive(Error, Debug)]
65pub enum DownlinkTaskError {
66    #[error("The downlink failed to start.")]
67    FailedToStart,
68    #[error("A synced envelope was received with no data provided.")]
69    SyncedWithNoValue,
70    #[error("{0}")]
71    BadFrame(#[from] FrameIoError),
72    #[error("Failed to deserialize frame body: {0}")]
73    DeserializationFailed(#[from] ReadError),
74    #[error("{0:?}")]
75    Custom(Box<dyn Error + Send + Sync + 'static>),
76}
77
78#[derive(Error, Debug, Clone)]
79pub enum DownlinkFailureReason {
80    #[error("The specified remote URL was not valid.")]
81    InvalidUrl,
82    #[error("The lane was unresolvable: {0}")]
83    UnresolvableRemote(Arc<std::io::Error>),
84    #[error("A local lane was not resolvable: {0}")]
85    UnresolvableLocal(RelativeAddress<Text>),
86    #[error("Connection to the remote host failed: {0}")]
87    ConnectionFailed(Arc<std::io::Error>),
88    #[error("Failed to negotiate a TLS connection: {message}")]
89    TlsConnectionFailed { message: String, recoverable: bool },
90    #[error("Could not negotiate a websocket connection: {0}")]
91    WebsocketNegotiationFailed(String),
92    #[error("The remote client stopped while the downlink was starting.")]
93    RemoteStopped,
94    #[error("The downlink runtime task stopped during attachment.")]
95    DownlinkStopped,
96}
97
98/// Error type for operations that communicate with the agent runtime.
99#[derive(Error, Debug, Clone, Copy, PartialEq, Eq)]
100pub enum AgentRuntimeError {
101    #[error("The agent runtime is stopping.")]
102    Stopping,
103    #[error("The agent runtime has terminated.")]
104    Terminated,
105}
106
107/// Error type for registering point to point command channels.
108#[derive(Error, Debug, Clone, Copy, PartialEq, Eq)]
109pub enum CommanderRegistrationError {
110    /// The registration could not be completed because the agent is stopping.
111    #[error(transparent)]
112    RuntimeError(#[from] AgentRuntimeError),
113    /// The limit on the number of registrations for the agent was exceeded.
114    #[error("Too many commander IDs were requested for the agent.")]
115    CommanderIdOverflow,
116}
117
118impl<T> From<mpsc::error::SendError<T>> for CommanderRegistrationError {
119    fn from(_: mpsc::error::SendError<T>) -> Self {
120        CommanderRegistrationError::RuntimeError(AgentRuntimeError::Terminated)
121    }
122}
123
124impl From<TriggerError> for CommanderRegistrationError {
125    fn from(_: TriggerError) -> Self {
126        CommanderRegistrationError::RuntimeError(AgentRuntimeError::Terminated)
127    }
128}
129
130/// Error type for the operation of spawning a new downlink on the runtime.
131#[derive(Error, Debug, Clone)]
132pub enum DownlinkRuntimeError {
133    #[error(transparent)]
134    RuntimeError(#[from] AgentRuntimeError),
135    #[error("Opening a new downlink failed: {0}")]
136    DownlinkConnectionFailed(DownlinkFailureReason),
137}
138
139/// Error type for requests so the runtime for creating/opening a state for an agent.
140#[derive(Error, Debug, Clone, Copy, PartialEq, Eq)]
141pub enum OpenStoreError {
142    #[error(transparent)]
143    RuntimeError(#[from] AgentRuntimeError),
144    #[error("The runtime does not support stores.")]
145    StoresNotSupported,
146    #[error(
147        "A store of kind {requested} was requested but the pre-existing store is of kind {actual}."
148    )]
149    IncorrectStoreKind {
150        requested: StoreKind,
151        actual: StoreKind,
152    },
153}
154
155/// Error type for requests to a running agent to register a new lane.
156#[derive(Clone, Debug, Error, PartialEq, Eq)]
157pub enum DynamicRegistrationError {
158    #[error("This agent does not support dynamically registered items.")]
159    DynamicRegistrationsNotSupported,
160    #[error("This agent only supports dynamic registration during initialization.")]
161    AfterInitialization,
162    #[error("The requested item name '{0}' is already in use.")]
163    DuplicateName(String),
164    #[error("This agent does not support dynamically adding lanes of type: {0}")]
165    LaneKindUnsupported(WarpLaneKind),
166    #[error("This agent does not support dynamically adding stores of type: {0}")]
167    StoreKindUnsupported(StoreKind),
168    #[error("This agent does not support dynamically adding HTTP lanes.")]
169    HttpLanesUnsupported,
170}
171
172/// Error type for an operation to add a new lane to a running agent.
173#[derive(Clone, Debug, Error, PartialEq, Eq)]
174pub enum LaneSpawnError {
175    /// The agent runtime stopped before the request could be completed,
176    #[error(transparent)]
177    Runtime(#[from] AgentRuntimeError),
178    /// The agent refused to register the lane.
179    #[error(transparent)]
180    Registration(#[from] DynamicRegistrationError),
181}
182
183impl<T> From<mpsc::error::SendError<T>> for AgentRuntimeError {
184    fn from(_: mpsc::error::SendError<T>) -> Self {
185        AgentRuntimeError::Terminated
186    }
187}
188
189impl<T> From<mpsc::error::SendError<T>> for OpenStoreError {
190    fn from(_: mpsc::error::SendError<T>) -> Self {
191        OpenStoreError::RuntimeError(AgentRuntimeError::Terminated)
192    }
193}
194
195impl<T> From<watch::error::SendError<T>> for AgentRuntimeError {
196    fn from(_: watch::error::SendError<T>) -> Self {
197        AgentRuntimeError::Terminated
198    }
199}
200
201impl From<promise::PromiseError> for AgentRuntimeError {
202    fn from(_: promise::PromiseError) -> Self {
203        AgentRuntimeError::Terminated
204    }
205}
206
207impl From<oneshot::error::RecvError> for AgentRuntimeError {
208    fn from(_: oneshot::error::RecvError) -> Self {
209        AgentRuntimeError::Terminated
210    }
211}
212
213impl From<oneshot::error::RecvError> for OpenStoreError {
214    fn from(_: oneshot::error::RecvError) -> Self {
215        OpenStoreError::RuntimeError(AgentRuntimeError::Terminated)
216    }
217}
218
219impl<T> From<mpsc::error::SendError<T>> for DownlinkRuntimeError {
220    fn from(_: mpsc::error::SendError<T>) -> Self {
221        DownlinkRuntimeError::RuntimeError(AgentRuntimeError::Terminated)
222    }
223}
224
225impl<T> From<watch::error::SendError<T>> for DownlinkRuntimeError {
226    fn from(_: watch::error::SendError<T>) -> Self {
227        DownlinkRuntimeError::RuntimeError(AgentRuntimeError::Terminated)
228    }
229}
230
231impl From<promise::PromiseError> for DownlinkRuntimeError {
232    fn from(_: promise::PromiseError) -> Self {
233        DownlinkRuntimeError::RuntimeError(AgentRuntimeError::Terminated)
234    }
235}
236
237impl From<oneshot::error::RecvError> for DownlinkRuntimeError {
238    fn from(_: oneshot::error::RecvError) -> Self {
239        DownlinkRuntimeError::RuntimeError(AgentRuntimeError::Terminated)
240    }
241}
242
243impl Recoverable for DownlinkFailureReason {
244    fn is_fatal(&self) -> bool {
245        match self {
246            DownlinkFailureReason::InvalidUrl => true,
247            DownlinkFailureReason::UnresolvableRemote(_) => false,
248            DownlinkFailureReason::ConnectionFailed(_) => false,
249            DownlinkFailureReason::WebsocketNegotiationFailed(_) => false,
250            DownlinkFailureReason::RemoteStopped => false,
251            DownlinkFailureReason::DownlinkStopped => false,
252            DownlinkFailureReason::UnresolvableLocal(_) => true,
253            DownlinkFailureReason::TlsConnectionFailed { recoverable, .. } => !recoverable,
254        }
255    }
256}
257
258impl Recoverable for DownlinkRuntimeError {
259    fn is_fatal(&self) -> bool {
260        match self {
261            DownlinkRuntimeError::RuntimeError(_) => true,
262            DownlinkRuntimeError::DownlinkConnectionFailed(err) => err.is_fatal(),
263        }
264    }
265}
266
267/// Error type is returned by implementations of the agent interface trait.
268#[derive(Error, Debug)]
269pub enum AgentTaskError {
270    #[error("Bad frame for lane '{lane}': {error}")]
271    BadFrame { lane: Text, error: FrameIoError },
272    #[error("Failed to deserialize frame body: {0}")]
273    DeserializationFailed(#[from] ReadError),
274    #[error("Error in use code (likely an event handler): {0}")]
275    UserCodeError(Box<dyn std::error::Error + Send>),
276    #[error("The agent failed to generate a required output: {0}")]
277    OutputFailed(std::io::Error),
278    #[error("Attempting to register a commander failed.")]
279    CommanderRegistrationFailed,
280}
281
282/// Error type that is returned by implementations of the agent interface trait during the initialization phase.
283#[derive(Error, Debug)]
284pub enum AgentInitError {
285    #[error("The agent failed to start.")]
286    FailedToStart,
287    #[error("Multiple lanes with the same name: {0}")]
288    DuplicateLane(Text),
289    #[error("Error in use code (likely an event handler): {0}")]
290    UserCodeError(Box<dyn std::error::Error + Send>),
291    #[error("Initializing the state of an agent lane failed: {0}")]
292    LaneInitializationFailure(FrameIoError),
293    #[error("Attempting to dynamically register a lane failed: {0}")]
294    RegistrationFailure(#[from] DynamicRegistrationError),
295    #[error(
296        "Requested a store of kind {requested} for item {name} but store was of kind {actual}."
297    )]
298    IncorrectStoreKind {
299        name: Text,
300        requested: StoreKind,
301        actual: StoreKind,
302    },
303    #[error("The parameters passed to the agent were invalid: {0}")]
304    InvalidParameters(#[from] UnapplyError),
305    #[error("Failed to initialize introspection for an agent: {0}")]
306    NodeIntrospection(#[from] NodeIntrospectionError),
307    #[error("Failed to initialize introspection for a labe: {0}")]
308    LaneIntrospection(#[from] LaneIntrospectionError),
309}
310
311//TODO Make this more sophisticated.
312impl From<AgentRuntimeError> for AgentInitError {
313    fn from(_: AgentRuntimeError) -> Self {
314        AgentInitError::FailedToStart
315    }
316}
317
318#[derive(Debug, Error)]
319pub enum StoreError {
320    /// This implementation does not provide stores.
321    #[error("No store available.")]
322    NoStoreAvailable,
323    /// The provided key was not found in the store.
324    #[error("The specified key was not found")]
325    KeyNotFound,
326    /// A key returned by the store did not have the correct format.
327    #[error("The store returned an invalid key")]
328    InvalidKey,
329    /// Invalid operation attempted.
330    #[error("An invalid operation was attempted (e.g. Updating a map entry on a value entry)")]
331    InvalidOperation,
332    /// The delegate byte engine failed to initialise.
333    #[error("The delegate store engine failed to initialise: {0}")]
334    InitialisationFailure(String),
335    /// An IO error produced by the delegate byte engine.
336    #[error("IO error: {0}")]
337    Io(#[from] io::Error),
338    /// An error produced when attempting to encode a value.
339    #[error("Encoding error: {0}")]
340    Encoding(String),
341    /// An error produced when attempting to decode a value.
342    #[error("Decoding error: {0}")]
343    Decoding(String),
344    /// A raw error produced by the delegate byte engine.
345    #[error("Delegate store error: {0}")]
346    Delegate(Box<dyn std::error::Error + Send + Sync>),
347    /// A raw error produced by the delegate byte engine that isn't send or sync
348    #[error("Delegate store error: {0}")]
349    DelegateMessage(String),
350    /// An operation was attempted on the byte engine when it was closing.
351    #[error("An operation was attempted on the delegate store engine when it was closing")]
352    Closing,
353    /// The requested keyspace was not found.
354    #[error("The requested keyspace was not found")]
355    KeyspaceNotFound,
356}
357
358impl StoreError {
359    pub fn downcast_ref<E: std::error::Error + 'static>(&self) -> Option<&E> {
360        match self {
361            StoreError::Delegate(d) => {
362                if let Some(downcasted) = d.downcast_ref() {
363                    return Some(downcasted);
364                }
365                None
366            }
367            _ => None,
368        }
369    }
370}
371
372impl PartialEq for StoreError {
373    fn eq(&self, other: &Self) -> bool {
374        match (self, other) {
375            (StoreError::KeyNotFound, StoreError::KeyNotFound) => true,
376            (StoreError::InitialisationFailure(l), StoreError::InitialisationFailure(r)) => l.eq(r),
377            (StoreError::Io(l), StoreError::Io(r)) => l.kind().eq(&r.kind()),
378            (StoreError::Encoding(l), StoreError::Encoding(r)) => l.eq(r),
379            (StoreError::Decoding(l), StoreError::Decoding(r)) => l.eq(r),
380            (StoreError::Delegate(l), StoreError::Delegate(r)) => l.to_string().eq(&r.to_string()),
381            (StoreError::DelegateMessage(l), StoreError::DelegateMessage(r)) => l.eq(r),
382            (StoreError::Closing, StoreError::Closing) => true,
383            (StoreError::KeyspaceNotFound, StoreError::KeyspaceNotFound) => true,
384            _ => false,
385        }
386    }
387}