1use std::io;
18use std::{error::Error, sync::Arc};
19
20use swimos_form::read::ReadError;
21use swimos_model::Text;
22use swimos_recon::parser::AsyncParseError;
23use swimos_utilities::trigger::TriggerError;
24use swimos_utilities::{errors::Recoverable, routing::UnapplyError, trigger::promise};
25use thiserror::Error;
26use tokio::sync::{mpsc, oneshot, watch};
27
28use crate::agent::WarpLaneKind;
29use crate::{address::RelativeAddress, agent::StoreKind};
30
31mod introspection;
32
33pub use introspection::{IntrospectionStopped, LaneIntrospectionError, NodeIntrospectionError};
34
35#[derive(Error, Debug)]
37pub enum FrameIoError {
38 #[error("{0}")]
39 Io(#[from] std::io::Error),
40 #[error("{0}")]
41 BadFrame(#[from] InvalidFrame),
42 #[error("The stream terminated when a frame was expected.")]
43 InvalidTermination,
44}
45
46impl From<AsyncParseError> for FrameIoError {
47 fn from(e: AsyncParseError) -> Self {
48 FrameIoError::BadFrame(InvalidFrame::InvalidMessageBody(e))
49 }
50}
51
52#[derive(Error, Debug)]
54pub enum InvalidFrame {
55 #[error("An incoming frame was incomplete.")]
56 Incomplete,
57 #[error("Invalid frame header: {problem}")]
58 InvalidHeader { problem: Text },
59 #[error("Invalid frame body: {0}")]
60 InvalidMessageBody(#[from] AsyncParseError),
61}
62
63#[derive(Error, Debug)]
65pub enum DownlinkTaskError {
66 #[error("The downlink failed to start.")]
67 FailedToStart,
68 #[error("A synced envelope was received with no data provided.")]
69 SyncedWithNoValue,
70 #[error("{0}")]
71 BadFrame(#[from] FrameIoError),
72 #[error("Failed to deserialize frame body: {0}")]
73 DeserializationFailed(#[from] ReadError),
74 #[error("{0:?}")]
75 Custom(Box<dyn Error + Send + Sync + 'static>),
76}
77
78#[derive(Error, Debug, Clone)]
79pub enum DownlinkFailureReason {
80 #[error("The specified remote URL was not valid.")]
81 InvalidUrl,
82 #[error("The lane was unresolvable: {0}")]
83 UnresolvableRemote(Arc<std::io::Error>),
84 #[error("A local lane was not resolvable: {0}")]
85 UnresolvableLocal(RelativeAddress<Text>),
86 #[error("Connection to the remote host failed: {0}")]
87 ConnectionFailed(Arc<std::io::Error>),
88 #[error("Failed to negotiate a TLS connection: {message}")]
89 TlsConnectionFailed { message: String, recoverable: bool },
90 #[error("Could not negotiate a websocket connection: {0}")]
91 WebsocketNegotiationFailed(String),
92 #[error("The remote client stopped while the downlink was starting.")]
93 RemoteStopped,
94 #[error("The downlink runtime task stopped during attachment.")]
95 DownlinkStopped,
96}
97
98#[derive(Error, Debug, Clone, Copy, PartialEq, Eq)]
100pub enum AgentRuntimeError {
101 #[error("The agent runtime is stopping.")]
102 Stopping,
103 #[error("The agent runtime has terminated.")]
104 Terminated,
105}
106
107#[derive(Error, Debug, Clone, Copy, PartialEq, Eq)]
109pub enum CommanderRegistrationError {
110 #[error(transparent)]
112 RuntimeError(#[from] AgentRuntimeError),
113 #[error("Too many commander IDs were requested for the agent.")]
115 CommanderIdOverflow,
116}
117
118impl<T> From<mpsc::error::SendError<T>> for CommanderRegistrationError {
119 fn from(_: mpsc::error::SendError<T>) -> Self {
120 CommanderRegistrationError::RuntimeError(AgentRuntimeError::Terminated)
121 }
122}
123
124impl From<TriggerError> for CommanderRegistrationError {
125 fn from(_: TriggerError) -> Self {
126 CommanderRegistrationError::RuntimeError(AgentRuntimeError::Terminated)
127 }
128}
129
130#[derive(Error, Debug, Clone)]
132pub enum DownlinkRuntimeError {
133 #[error(transparent)]
134 RuntimeError(#[from] AgentRuntimeError),
135 #[error("Opening a new downlink failed: {0}")]
136 DownlinkConnectionFailed(DownlinkFailureReason),
137}
138
139#[derive(Error, Debug, Clone, Copy, PartialEq, Eq)]
141pub enum OpenStoreError {
142 #[error(transparent)]
143 RuntimeError(#[from] AgentRuntimeError),
144 #[error("The runtime does not support stores.")]
145 StoresNotSupported,
146 #[error(
147 "A store of kind {requested} was requested but the pre-existing store is of kind {actual}."
148 )]
149 IncorrectStoreKind {
150 requested: StoreKind,
151 actual: StoreKind,
152 },
153}
154
155#[derive(Clone, Debug, Error, PartialEq, Eq)]
157pub enum DynamicRegistrationError {
158 #[error("This agent does not support dynamically registered items.")]
159 DynamicRegistrationsNotSupported,
160 #[error("This agent only supports dynamic registration during initialization.")]
161 AfterInitialization,
162 #[error("The requested item name '{0}' is already in use.")]
163 DuplicateName(String),
164 #[error("This agent does not support dynamically adding lanes of type: {0}")]
165 LaneKindUnsupported(WarpLaneKind),
166 #[error("This agent does not support dynamically adding stores of type: {0}")]
167 StoreKindUnsupported(StoreKind),
168 #[error("This agent does not support dynamically adding HTTP lanes.")]
169 HttpLanesUnsupported,
170}
171
172#[derive(Clone, Debug, Error, PartialEq, Eq)]
174pub enum LaneSpawnError {
175 #[error(transparent)]
177 Runtime(#[from] AgentRuntimeError),
178 #[error(transparent)]
180 Registration(#[from] DynamicRegistrationError),
181}
182
183impl<T> From<mpsc::error::SendError<T>> for AgentRuntimeError {
184 fn from(_: mpsc::error::SendError<T>) -> Self {
185 AgentRuntimeError::Terminated
186 }
187}
188
189impl<T> From<mpsc::error::SendError<T>> for OpenStoreError {
190 fn from(_: mpsc::error::SendError<T>) -> Self {
191 OpenStoreError::RuntimeError(AgentRuntimeError::Terminated)
192 }
193}
194
195impl<T> From<watch::error::SendError<T>> for AgentRuntimeError {
196 fn from(_: watch::error::SendError<T>) -> Self {
197 AgentRuntimeError::Terminated
198 }
199}
200
201impl From<promise::PromiseError> for AgentRuntimeError {
202 fn from(_: promise::PromiseError) -> Self {
203 AgentRuntimeError::Terminated
204 }
205}
206
207impl From<oneshot::error::RecvError> for AgentRuntimeError {
208 fn from(_: oneshot::error::RecvError) -> Self {
209 AgentRuntimeError::Terminated
210 }
211}
212
213impl From<oneshot::error::RecvError> for OpenStoreError {
214 fn from(_: oneshot::error::RecvError) -> Self {
215 OpenStoreError::RuntimeError(AgentRuntimeError::Terminated)
216 }
217}
218
219impl<T> From<mpsc::error::SendError<T>> for DownlinkRuntimeError {
220 fn from(_: mpsc::error::SendError<T>) -> Self {
221 DownlinkRuntimeError::RuntimeError(AgentRuntimeError::Terminated)
222 }
223}
224
225impl<T> From<watch::error::SendError<T>> for DownlinkRuntimeError {
226 fn from(_: watch::error::SendError<T>) -> Self {
227 DownlinkRuntimeError::RuntimeError(AgentRuntimeError::Terminated)
228 }
229}
230
231impl From<promise::PromiseError> for DownlinkRuntimeError {
232 fn from(_: promise::PromiseError) -> Self {
233 DownlinkRuntimeError::RuntimeError(AgentRuntimeError::Terminated)
234 }
235}
236
237impl From<oneshot::error::RecvError> for DownlinkRuntimeError {
238 fn from(_: oneshot::error::RecvError) -> Self {
239 DownlinkRuntimeError::RuntimeError(AgentRuntimeError::Terminated)
240 }
241}
242
243impl Recoverable for DownlinkFailureReason {
244 fn is_fatal(&self) -> bool {
245 match self {
246 DownlinkFailureReason::InvalidUrl => true,
247 DownlinkFailureReason::UnresolvableRemote(_) => false,
248 DownlinkFailureReason::ConnectionFailed(_) => false,
249 DownlinkFailureReason::WebsocketNegotiationFailed(_) => false,
250 DownlinkFailureReason::RemoteStopped => false,
251 DownlinkFailureReason::DownlinkStopped => false,
252 DownlinkFailureReason::UnresolvableLocal(_) => true,
253 DownlinkFailureReason::TlsConnectionFailed { recoverable, .. } => !recoverable,
254 }
255 }
256}
257
258impl Recoverable for DownlinkRuntimeError {
259 fn is_fatal(&self) -> bool {
260 match self {
261 DownlinkRuntimeError::RuntimeError(_) => true,
262 DownlinkRuntimeError::DownlinkConnectionFailed(err) => err.is_fatal(),
263 }
264 }
265}
266
267#[derive(Error, Debug)]
269pub enum AgentTaskError {
270 #[error("Bad frame for lane '{lane}': {error}")]
271 BadFrame { lane: Text, error: FrameIoError },
272 #[error("Failed to deserialize frame body: {0}")]
273 DeserializationFailed(#[from] ReadError),
274 #[error("Error in use code (likely an event handler): {0}")]
275 UserCodeError(Box<dyn std::error::Error + Send>),
276 #[error("The agent failed to generate a required output: {0}")]
277 OutputFailed(std::io::Error),
278 #[error("Attempting to register a commander failed.")]
279 CommanderRegistrationFailed,
280}
281
282#[derive(Error, Debug)]
284pub enum AgentInitError {
285 #[error("The agent failed to start.")]
286 FailedToStart,
287 #[error("Multiple lanes with the same name: {0}")]
288 DuplicateLane(Text),
289 #[error("Error in use code (likely an event handler): {0}")]
290 UserCodeError(Box<dyn std::error::Error + Send>),
291 #[error("Initializing the state of an agent lane failed: {0}")]
292 LaneInitializationFailure(FrameIoError),
293 #[error("Attempting to dynamically register a lane failed: {0}")]
294 RegistrationFailure(#[from] DynamicRegistrationError),
295 #[error(
296 "Requested a store of kind {requested} for item {name} but store was of kind {actual}."
297 )]
298 IncorrectStoreKind {
299 name: Text,
300 requested: StoreKind,
301 actual: StoreKind,
302 },
303 #[error("The parameters passed to the agent were invalid: {0}")]
304 InvalidParameters(#[from] UnapplyError),
305 #[error("Failed to initialize introspection for an agent: {0}")]
306 NodeIntrospection(#[from] NodeIntrospectionError),
307 #[error("Failed to initialize introspection for a labe: {0}")]
308 LaneIntrospection(#[from] LaneIntrospectionError),
309}
310
311impl From<AgentRuntimeError> for AgentInitError {
313 fn from(_: AgentRuntimeError) -> Self {
314 AgentInitError::FailedToStart
315 }
316}
317
318#[derive(Debug, Error)]
319pub enum StoreError {
320 #[error("No store available.")]
322 NoStoreAvailable,
323 #[error("The specified key was not found")]
325 KeyNotFound,
326 #[error("The store returned an invalid key")]
328 InvalidKey,
329 #[error("An invalid operation was attempted (e.g. Updating a map entry on a value entry)")]
331 InvalidOperation,
332 #[error("The delegate store engine failed to initialise: {0}")]
334 InitialisationFailure(String),
335 #[error("IO error: {0}")]
337 Io(#[from] io::Error),
338 #[error("Encoding error: {0}")]
340 Encoding(String),
341 #[error("Decoding error: {0}")]
343 Decoding(String),
344 #[error("Delegate store error: {0}")]
346 Delegate(Box<dyn std::error::Error + Send + Sync>),
347 #[error("Delegate store error: {0}")]
349 DelegateMessage(String),
350 #[error("An operation was attempted on the delegate store engine when it was closing")]
352 Closing,
353 #[error("The requested keyspace was not found")]
355 KeyspaceNotFound,
356}
357
358impl StoreError {
359 pub fn downcast_ref<E: std::error::Error + 'static>(&self) -> Option<&E> {
360 match self {
361 StoreError::Delegate(d) => {
362 if let Some(downcasted) = d.downcast_ref() {
363 return Some(downcasted);
364 }
365 None
366 }
367 _ => None,
368 }
369 }
370}
371
372impl PartialEq for StoreError {
373 fn eq(&self, other: &Self) -> bool {
374 match (self, other) {
375 (StoreError::KeyNotFound, StoreError::KeyNotFound) => true,
376 (StoreError::InitialisationFailure(l), StoreError::InitialisationFailure(r)) => l.eq(r),
377 (StoreError::Io(l), StoreError::Io(r)) => l.kind().eq(&r.kind()),
378 (StoreError::Encoding(l), StoreError::Encoding(r)) => l.eq(r),
379 (StoreError::Decoding(l), StoreError::Decoding(r)) => l.eq(r),
380 (StoreError::Delegate(l), StoreError::Delegate(r)) => l.to_string().eq(&r.to_string()),
381 (StoreError::DelegateMessage(l), StoreError::DelegateMessage(r)) => l.eq(r),
382 (StoreError::Closing, StoreError::Closing) => true,
383 (StoreError::KeyspaceNotFound, StoreError::KeyspaceNotFound) => true,
384 _ => false,
385 }
386 }
387}