Skip to main content

actr_framework/
workload.rs

1//! Workload trait - Executable actor workload
2//!
3//! Defines the user-facing programming interface: each logical Actor
4//! implements (or has generated for it) a [`Workload`] that associates a
5//! [`MessageDispatcher`] and exposes a fixed set of observation hooks the
6//! framework fires over the lifetime of the actor node.
7
8use std::time::SystemTime;
9
10use actr_protocol::{ActorResult, ActrError, ActrId};
11use async_trait::async_trait;
12
13use crate::{Context, MessageDispatcher};
14
15// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
16// Event payloads
17// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
18
19/// Peer-scoped event payload for transport hooks.
20///
21/// Public WebRTC peer status exposed through observation hooks.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum WebRtcPeerStatus {
24    Idle,
25    Connecting,
26    Connected,
27    Recovering,
28}
29
30/// Used by WebSocket and WebRTC hook callbacks to identify the remote peer
31/// involved in the state change. For WebRTC, `relayed` reports whether the
32/// selected ICE candidate pair traversed a TURN relay, and `status` reports
33/// the coarse send-readiness state. For WebSocket both fields are always
34/// `None` (not applicable).
35#[derive(Debug, Clone)]
36pub struct PeerEvent {
37    /// Remote peer identity.
38    pub peer: ActrId,
39    /// `Some(true)` if the WebRTC connection is TURN-relayed, `Some(false)` for
40    /// a direct peer-to-peer connection. Always `None` for WebSocket events.
41    pub relayed: Option<bool>,
42    /// Coarse WebRTC send-readiness state. Always `None` for WebSocket events.
43    pub status: Option<WebRtcPeerStatus>,
44}
45
46/// Error event payload passed to [`Workload::on_error`].
47///
48/// Wraps the structured cause ([`ActrError`]), a coarse domain
49/// classification, free-form context describing where the error happened,
50/// and a wall-clock timestamp captured at the reporting site.
51#[derive(Debug, Clone)]
52pub struct ErrorEvent {
53    /// Underlying protocol error.
54    pub source: ActrError,
55    /// Coarse category to aid routing / alerting in user handlers.
56    pub category: ErrorCategory,
57    /// Free-form human-readable context (route key, handler name, stage, ...).
58    pub context: String,
59    /// Wall-clock timestamp captured when the event was emitted.
60    pub timestamp: SystemTime,
61}
62
63impl ErrorEvent {
64    /// Convenience constructor: stamp `timestamp` with `SystemTime::now()`.
65    pub fn now(source: ActrError, category: ErrorCategory, context: impl Into<String>) -> Self {
66        Self {
67            source,
68            category,
69            context: context.into(),
70            timestamp: SystemTime::now(),
71        }
72    }
73}
74
75/// Coarse fault-domain classification for [`ErrorEvent`].
76///
77/// Mirrors the top-level protocol [`actr_protocol::ErrorKind`] but is
78/// specialised for the *dispatch* boundary — callers want to distinguish
79/// "user code blew up" (`HandlerPanic` / `HandlerError`) from "the plumbing
80/// under them failed" (`SignalingFailure` / `TransportFailure`).
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum ErrorCategory {
83    /// User handler code panicked. `source` will typically be a
84    /// [`ActrError::DecodeFailure`] wrapping the panic message.
85    HandlerPanic,
86    /// User handler returned `Err` normally. `source` is the handler's
87    /// own [`ActrError`] value.
88    HandlerError,
89    /// Signaling layer failure (AIS registration, reconnect, credential
90    /// verification).
91    SignalingFailure,
92    /// Transport layer failure: WebSocket / WebRTC connection errors, lane
93    /// / mpsc plumbing faults.
94    TransportFailure,
95    /// A `send_data_stream` was active when the WebRTC/DataChannel path was
96    /// interrupted. Delivery is uncertain; the framework has not confirmed
97    /// loss or performed resume.
98    DataStreamDeliveryUncertain,
99}
100
101/// Credential lifecycle event.
102///
103/// Fired on initial registration and any subsequent renewal (via
104/// [`Workload::on_credential_renewed`]), and also used to warn when an
105/// active credential is approaching its expiry (via
106/// [`Workload::on_credential_expiring`]).
107#[derive(Debug, Clone)]
108pub struct CredentialEvent {
109    /// Absolute expiry time of the credential that triggered the event.
110    pub new_expiry: SystemTime,
111}
112
113/// Mailbox backpressure event.
114///
115/// Fires once per "mailbox pressure crossed configured threshold" incident.
116/// Use [`HyperConfig::mailbox_backpressure_threshold`] (see `actr-hyper`) to
117/// tune the trip level.
118#[derive(Debug, Clone, Copy)]
119pub struct BackpressureEvent {
120    /// Current queued-message count at the time of the sample.
121    pub queue_len: usize,
122    /// Threshold that was crossed.
123    pub threshold: usize,
124}
125
126// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
127// Workload trait
128// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
129
130/// Workload — Executable Actor workload
131///
132/// Represents a complete Actor instance, composed of:
133/// - an associated [`MessageDispatcher`] ([`Workload::Dispatcher`]) that
134///   knows how to decode and route incoming RPC envelopes;
135/// - sixteen observation hooks grouped by category (see below), each with a
136///   meaningful default that emits a `tracing` record so unmodified
137///   workloads produce useful operational logs out of the box.
138///
139/// # Design
140///
141/// - **Bidirectional association**: [`Workload::Dispatcher`] and
142///   [`MessageDispatcher::Workload`] refer to each other so that generated
143///   dispatchers can access the user-implemented workload type.
144/// - **Meaningful defaults**: every hook has a `tracing`-emitting default
145///   (`info` / `debug` / `warn` / `error` levels tuned per hook). Users
146///   override only the hooks whose behaviour they want to extend.
147/// - **Auto-implementation**: the code generator emits
148///   `impl<T: Handler> Workload for Wrapper<T> { type Dispatcher = Router<T>; }`
149///   which inherits every default implementation. Generated wrappers thus
150///   get 16 hooks "for free".
151///
152/// # Hook categories
153///
154/// Hooks are organised into six categories. Override only the hooks you need.
155///
156/// ## Lifecycle (4) — fallible
157/// - [`Workload::on_start`] — node started, before accepting requests
158/// - [`Workload::on_ready`] — node registered and ready to accept requests
159/// - [`Workload::on_stop`] — shutdown signal received
160/// - [`Workload::on_error`] — framework caught a runtime error
161///
162/// Lifecycle hooks return [`ActorResult`]; errors propagate and abort startup
163/// (for `on_start`) or are logged by the framework (for the others).
164///
165/// ## Signaling (3) — infallible
166/// - [`Workload::on_signaling_connecting`]
167/// - [`Workload::on_signaling_connected`]
168/// - [`Workload::on_signaling_disconnected`]
169///
170/// ## Transport — WebSocket (3) — infallible
171/// - [`Workload::on_websocket_connecting`]
172/// - [`Workload::on_websocket_connected`]
173/// - [`Workload::on_websocket_disconnected`]
174///
175/// ## Transport — WebRTC P2P (3) — infallible
176/// - [`Workload::on_webrtc_connecting`]
177/// - [`Workload::on_webrtc_connected`] — includes `relayed` info via
178///   [`PeerEvent::relayed`]
179/// - [`Workload::on_webrtc_disconnected`]
180///
181/// ## Credential (2) — infallible
182/// - [`Workload::on_credential_renewed`]
183/// - [`Workload::on_credential_expiring`]
184///
185/// ## Mailbox (1) — infallible
186/// - [`Workload::on_mailbox_backpressure`]
187///
188/// # Code Generation Example
189///
190/// ```rust,ignore
191/// // User-implemented Handler
192/// pub struct MyEchoService { /* ... */ }
193///
194/// impl EchoServiceHandler for MyEchoService {
195///     async fn echo<C: Context>(
196///         &self,
197///         req: EchoRequest,
198///         ctx: &C,
199///     ) -> ActorResult<EchoResponse> {
200///         Ok(EchoResponse { reply: format!("Echo: {}", req.message) })
201///     }
202/// }
203///
204/// // Code-generated Workload wrapper — inherits all 16 defaults.
205/// pub struct EchoServiceWorkload<T: EchoServiceHandler>(pub T);
206///
207/// impl<T: EchoServiceHandler> Workload for EchoServiceWorkload<T> {
208///     type Dispatcher = EchoServiceRouter<T>;
209/// }
210/// ```
211// Workload trait is `?Send` on `wasm32` (browser single-threaded) and keeps
212// the native default `Send` auto trait elsewhere so tokio-multi-thread-backed
213// adapters on the native side continue to produce `Send` futures without
214// fighting `async_trait`. Per Option U γ-unified §3.2 the user-facing bound
215// is `'static`; `MaybeSendSync` silently re-adds `Send + Sync` on native so
216// the lifecycle-hook default bodies and the hyper `WorkloadAdapter`
217// downstream see the `Send` futures they require.
218#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
219#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
220pub trait Workload: crate::MaybeSendSync + 'static {
221    /// Associated dispatcher type.
222    type Dispatcher: MessageDispatcher<Workload = Self>;
223
224    // ─────────────────────────────────────────────────────────────────────
225    // Lifecycle
226    // ─────────────────────────────────────────────────────────────────────
227
228    /// Called when the node has started.
229    ///
230    /// Use this to initialise business resources, start timers, etc. Returning
231    /// `Err` aborts node startup.
232    async fn on_start<C: Context>(&self, _ctx: &C) -> ActorResult<()> {
233        tracing::info!("workload on_start");
234        Ok(())
235    }
236
237    /// Called when signaling is connected and registration is complete: the
238    /// node is now discoverable and may serve requests.
239    async fn on_ready<C: Context>(&self, _ctx: &C) -> ActorResult<()> {
240        tracing::info!("workload on_ready (node ready to accept requests)");
241        Ok(())
242    }
243
244    /// Called when the node receives a shutdown signal.
245    ///
246    /// Use this to release business resources and persist state.
247    async fn on_stop<C: Context>(&self, _ctx: &C) -> ActorResult<()> {
248        tracing::info!("workload on_stop");
249        Ok(())
250    }
251
252    /// Called when the framework catches a runtime error.
253    ///
254    /// See [`ErrorEvent`] and [`ErrorCategory`] for the structure of the
255    /// argument. Use this for alerting, logging, or graceful degradation.
256    async fn on_error<C: Context>(&self, _ctx: &C, event: &ErrorEvent) -> ActorResult<()> {
257        tracing::error!(
258            category = ?event.category,
259            context = %event.context,
260            source = %event.source,
261            "workload on_error",
262        );
263        Ok(())
264    }
265
266    // ─────────────────────────────────────────────────────────────────────
267    // Signaling
268    // ─────────────────────────────────────────────────────────────────────
269
270    /// Called when signaling connection attempt begins.
271    ///
272    /// `ctx` is `None` during the initial connection (before the node has
273    /// obtained its identity) and `Some` for every subsequent reconnect.
274    async fn on_signaling_connecting<C: Context>(&self, _ctx: Option<&C>) {
275        tracing::debug!("signaling connecting");
276    }
277
278    /// Called when signaling connection is established. Actor is online.
279    ///
280    /// `ctx` is `None` during the initial connection and `Some` for every
281    /// subsequent reconnect.
282    async fn on_signaling_connected<C: Context>(&self, _ctx: Option<&C>) {
283        tracing::info!("signaling connected");
284    }
285
286    /// Called when signaling connection is lost. Actor is offline.
287    async fn on_signaling_disconnected<C: Context>(&self, _ctx: &C) {
288        tracing::warn!("signaling disconnected");
289    }
290
291    // ─────────────────────────────────────────────────────────────────────
292    // Transport — WebSocket
293    // ─────────────────────────────────────────────────────────────────────
294
295    /// Called when a WebSocket connection attempt to a peer begins.
296    async fn on_websocket_connecting<C: Context>(&self, _ctx: &C, event: &PeerEvent) {
297        tracing::debug!(peer = %event.peer, "websocket connecting");
298    }
299
300    /// Called when a WebSocket connection to a peer is established.
301    async fn on_websocket_connected<C: Context>(&self, _ctx: &C, event: &PeerEvent) {
302        tracing::info!(peer = %event.peer, "websocket connected");
303    }
304
305    /// Called when a WebSocket connection to a peer is lost.
306    async fn on_websocket_disconnected<C: Context>(&self, _ctx: &C, event: &PeerEvent) {
307        tracing::warn!(peer = %event.peer, "websocket disconnected");
308    }
309
310    // ─────────────────────────────────────────────────────────────────────
311    // Transport — WebRTC P2P
312    // ─────────────────────────────────────────────────────────────────────
313
314    /// Called when a WebRTC P2P connection attempt to a peer begins.
315    async fn on_webrtc_connecting<C: Context>(&self, _ctx: &C, event: &PeerEvent) {
316        tracing::debug!(peer = %event.peer, "webrtc connecting");
317    }
318
319    /// Called when a WebRTC P2P connection to a peer is established.
320    ///
321    /// `event.relayed` carries whether the selected ICE candidate pair
322    /// traverses a TURN relay (`Some(true)`) or is a direct P2P connection
323    /// (`Some(false)`).
324    async fn on_webrtc_connected<C: Context>(&self, _ctx: &C, event: &PeerEvent) {
325        tracing::info!(
326            peer = %event.peer,
327            relayed = ?event.relayed,
328            "webrtc connected",
329        );
330    }
331
332    /// Called when a WebRTC P2P connection to a peer is lost.
333    async fn on_webrtc_disconnected<C: Context>(&self, _ctx: &C, event: &PeerEvent) {
334        tracing::warn!(peer = %event.peer, "webrtc disconnected");
335    }
336
337    // ─────────────────────────────────────────────────────────────────────
338    // Credential
339    // ─────────────────────────────────────────────────────────────────────
340
341    /// Called when the current credential is renewed (initial registration
342    /// or subsequent refresh).
343    async fn on_credential_renewed<C: Context>(&self, _ctx: &C, event: &CredentialEvent) {
344        tracing::info!(new_expiry = ?event.new_expiry, "credential renewed");
345    }
346
347    /// Called when the active credential is approaching its expiry.
348    ///
349    /// The warning lead time is controlled by
350    /// `HyperConfig::credential_expiry_warning` on the hyper layer.
351    async fn on_credential_expiring<C: Context>(&self, _ctx: &C, event: &CredentialEvent) {
352        tracing::warn!(
353            new_expiry = ?event.new_expiry,
354            "credential expiring soon",
355        );
356    }
357
358    // ─────────────────────────────────────────────────────────────────────
359    // Mailbox
360    // ─────────────────────────────────────────────────────────────────────
361
362    /// Called once per incident when the persistent mailbox queue length
363    /// crosses the configured backpressure threshold.
364    ///
365    /// Fires once per cross (rising edge); the framework resets the
366    /// triggered flag once the queue falls below the threshold again.
367    async fn on_mailbox_backpressure<C: Context>(&self, _ctx: &C, event: &BackpressureEvent) {
368        tracing::warn!(
369            queue_len = event.queue_len,
370            threshold = event.threshold,
371            "mailbox backpressure",
372        );
373    }
374}