Skip to main content

actr_framework/
workload.rs

1//! Workload trait - Executable actor workload
2//!
3//! Defines the user-facing programming interface: each logical Actor
4//! implements (or has generated for it) a [`Workload`] that associates a
5//! [`MessageDispatcher`] and exposes a fixed set of observation hooks the
6//! framework fires over the lifetime of the actor node.
7
8use std::time::SystemTime;
9
10use actr_protocol::{ActorResult, ActrError, ActrId};
11use async_trait::async_trait;
12
13use crate::{Context, MessageDispatcher};
14
15// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
16// Event payloads
17// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
18
19/// Peer-scoped event payload for transport hooks.
20///
21/// Used by WebSocket and WebRTC hook callbacks to identify the remote peer
22/// involved in the state change. For WebRTC, `relayed` reports whether the
23/// selected ICE candidate pair traversed a TURN relay; for WebSocket the
24/// field is always `None` (not applicable).
25#[derive(Debug, Clone)]
26pub struct PeerEvent {
27    /// Remote peer identity.
28    pub peer: ActrId,
29    /// `Some(true)` if the WebRTC connection is TURN-relayed, `Some(false)` for
30    /// a direct peer-to-peer connection. Always `None` for WebSocket events.
31    pub relayed: Option<bool>,
32}
33
34/// Error event payload passed to [`Workload::on_error`].
35///
36/// Wraps the structured cause ([`ActrError`]), a coarse domain
37/// classification, free-form context describing where the error happened,
38/// and a wall-clock timestamp captured at the reporting site.
39#[derive(Debug, Clone)]
40pub struct ErrorEvent {
41    /// Underlying protocol error.
42    pub source: ActrError,
43    /// Coarse category to aid routing / alerting in user handlers.
44    pub category: ErrorCategory,
45    /// Free-form human-readable context (route key, handler name, stage, ...).
46    pub context: String,
47    /// Wall-clock timestamp captured when the event was emitted.
48    pub timestamp: SystemTime,
49}
50
51impl ErrorEvent {
52    /// Convenience constructor: stamp `timestamp` with `SystemTime::now()`.
53    pub fn now(source: ActrError, category: ErrorCategory, context: impl Into<String>) -> Self {
54        Self {
55            source,
56            category,
57            context: context.into(),
58            timestamp: SystemTime::now(),
59        }
60    }
61}
62
63/// Coarse fault-domain classification for [`ErrorEvent`].
64///
65/// Mirrors the top-level protocol [`actr_protocol::ErrorKind`] but is
66/// specialised for the *dispatch* boundary — callers want to distinguish
67/// "user code blew up" (`HandlerPanic` / `HandlerError`) from "the plumbing
68/// under them failed" (`SignalingFailure` / `TransportFailure`).
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum ErrorCategory {
71    /// User handler code panicked. `source` will typically be a
72    /// [`ActrError::DecodeFailure`] wrapping the panic message.
73    HandlerPanic,
74    /// User handler returned `Err` normally. `source` is the handler's
75    /// own [`ActrError`] value.
76    HandlerError,
77    /// Signaling layer failure (AIS registration, reconnect, credential
78    /// verification).
79    SignalingFailure,
80    /// Transport layer failure: WebSocket / WebRTC connection errors, lane
81    /// / mpsc plumbing faults.
82    TransportFailure,
83    /// A `send_data_stream` was active when the WebRTC/DataChannel path was
84    /// interrupted. Delivery is uncertain; the framework has not confirmed
85    /// loss or performed resume.
86    DataStreamDeliveryUncertain,
87}
88
89/// Credential lifecycle event.
90///
91/// Fired on initial registration and any subsequent renewal (via
92/// [`Workload::on_credential_renewed`]), and also used to warn when an
93/// active credential is approaching its expiry (via
94/// [`Workload::on_credential_expiring`]).
95#[derive(Debug, Clone)]
96pub struct CredentialEvent {
97    /// Absolute expiry time of the credential that triggered the event.
98    pub new_expiry: SystemTime,
99}
100
101/// Mailbox backpressure event.
102///
103/// Fires once per "mailbox pressure crossed configured threshold" incident.
104/// Use [`HyperConfig::mailbox_backpressure_threshold`] (see `actr-hyper`) to
105/// tune the trip level.
106#[derive(Debug, Clone, Copy)]
107pub struct BackpressureEvent {
108    /// Current queued-message count at the time of the sample.
109    pub queue_len: usize,
110    /// Threshold that was crossed.
111    pub threshold: usize,
112}
113
114// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
115// Workload trait
116// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
117
118/// Workload — Executable Actor workload
119///
120/// Represents a complete Actor instance, composed of:
121/// - an associated [`MessageDispatcher`] ([`Workload::Dispatcher`]) that
122///   knows how to decode and route incoming RPC envelopes;
123/// - sixteen observation hooks grouped by category (see below), each with a
124///   meaningful default that emits a `tracing` record so unmodified
125///   workloads produce useful operational logs out of the box.
126///
127/// # Design
128///
129/// - **Bidirectional association**: [`Workload::Dispatcher`] and
130///   [`MessageDispatcher::Workload`] refer to each other so that generated
131///   dispatchers can access the user-implemented workload type.
132/// - **Meaningful defaults**: every hook has a `tracing`-emitting default
133///   (`info` / `debug` / `warn` / `error` levels tuned per hook). Users
134///   override only the hooks whose behaviour they want to extend.
135/// - **Auto-implementation**: the code generator emits
136///   `impl<T: Handler> Workload for Wrapper<T> { type Dispatcher = Router<T>; }`
137///   which inherits every default implementation. Generated wrappers thus
138///   get 16 hooks "for free".
139///
140/// # Hook categories
141///
142/// Hooks are organised into six categories. Override only the hooks you need.
143///
144/// ## Lifecycle (4) — fallible
145/// - [`Workload::on_start`] — node started, before accepting requests
146/// - [`Workload::on_ready`] — node registered and ready to accept requests
147/// - [`Workload::on_stop`] — shutdown signal received
148/// - [`Workload::on_error`] — framework caught a runtime error
149///
150/// Lifecycle hooks return [`ActorResult`]; errors propagate and abort startup
151/// (for `on_start`) or are logged by the framework (for the others).
152///
153/// ## Signaling (3) — infallible
154/// - [`Workload::on_signaling_connecting`]
155/// - [`Workload::on_signaling_connected`]
156/// - [`Workload::on_signaling_disconnected`]
157///
158/// ## Transport — WebSocket (3) — infallible
159/// - [`Workload::on_websocket_connecting`]
160/// - [`Workload::on_websocket_connected`]
161/// - [`Workload::on_websocket_disconnected`]
162///
163/// ## Transport — WebRTC P2P (3) — infallible
164/// - [`Workload::on_webrtc_connecting`]
165/// - [`Workload::on_webrtc_connected`] — includes `relayed` info via
166///   [`PeerEvent::relayed`]
167/// - [`Workload::on_webrtc_disconnected`]
168///
169/// ## Credential (2) — infallible
170/// - [`Workload::on_credential_renewed`]
171/// - [`Workload::on_credential_expiring`]
172///
173/// ## Mailbox (1) — infallible
174/// - [`Workload::on_mailbox_backpressure`]
175///
176/// # Code Generation Example
177///
178/// ```rust,ignore
179/// // User-implemented Handler
180/// pub struct MyEchoService { /* ... */ }
181///
182/// impl EchoServiceHandler for MyEchoService {
183///     async fn echo<C: Context>(
184///         &self,
185///         req: EchoRequest,
186///         ctx: &C,
187///     ) -> ActorResult<EchoResponse> {
188///         Ok(EchoResponse { reply: format!("Echo: {}", req.message) })
189///     }
190/// }
191///
192/// // Code-generated Workload wrapper — inherits all 16 defaults.
193/// pub struct EchoServiceWorkload<T: EchoServiceHandler>(pub T);
194///
195/// impl<T: EchoServiceHandler> Workload for EchoServiceWorkload<T> {
196///     type Dispatcher = EchoServiceRouter<T>;
197/// }
198/// ```
199// Workload trait is `?Send` on `wasm32` (browser single-threaded) and keeps
200// the native default `Send` auto trait elsewhere so tokio-multi-thread-backed
201// adapters on the native side continue to produce `Send` futures without
202// fighting `async_trait`. Per Option U γ-unified §3.2 the user-facing bound
203// is `'static`; `MaybeSendSync` silently re-adds `Send + Sync` on native so
204// the lifecycle-hook default bodies and the hyper `WorkloadAdapter`
205// downstream see the `Send` futures they require.
206#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
207#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
208pub trait Workload: crate::MaybeSendSync + 'static {
209    /// Associated dispatcher type.
210    type Dispatcher: MessageDispatcher<Workload = Self>;
211
212    // ─────────────────────────────────────────────────────────────────────
213    // Lifecycle
214    // ─────────────────────────────────────────────────────────────────────
215
216    /// Called when the node has started.
217    ///
218    /// Use this to initialise business resources, start timers, etc. Returning
219    /// `Err` aborts node startup.
220    async fn on_start<C: Context>(&self, _ctx: &C) -> ActorResult<()> {
221        tracing::info!("workload on_start");
222        Ok(())
223    }
224
225    /// Called when signaling is connected and registration is complete: the
226    /// node is now discoverable and may serve requests.
227    async fn on_ready<C: Context>(&self, _ctx: &C) -> ActorResult<()> {
228        tracing::info!("workload on_ready (node ready to accept requests)");
229        Ok(())
230    }
231
232    /// Called when the node receives a shutdown signal.
233    ///
234    /// Use this to release business resources and persist state.
235    async fn on_stop<C: Context>(&self, _ctx: &C) -> ActorResult<()> {
236        tracing::info!("workload on_stop");
237        Ok(())
238    }
239
240    /// Called when the framework catches a runtime error.
241    ///
242    /// See [`ErrorEvent`] and [`ErrorCategory`] for the structure of the
243    /// argument. Use this for alerting, logging, or graceful degradation.
244    async fn on_error<C: Context>(&self, _ctx: &C, event: &ErrorEvent) -> ActorResult<()> {
245        tracing::error!(
246            category = ?event.category,
247            context = %event.context,
248            source = %event.source,
249            "workload on_error",
250        );
251        Ok(())
252    }
253
254    // ─────────────────────────────────────────────────────────────────────
255    // Signaling
256    // ─────────────────────────────────────────────────────────────────────
257
258    /// Called when signaling connection attempt begins.
259    ///
260    /// `ctx` is `None` during the initial connection (before the node has
261    /// obtained its identity) and `Some` for every subsequent reconnect.
262    async fn on_signaling_connecting<C: Context>(&self, _ctx: Option<&C>) {
263        tracing::debug!("signaling connecting");
264    }
265
266    /// Called when signaling connection is established. Actor is online.
267    ///
268    /// `ctx` is `None` during the initial connection and `Some` for every
269    /// subsequent reconnect.
270    async fn on_signaling_connected<C: Context>(&self, _ctx: Option<&C>) {
271        tracing::info!("signaling connected");
272    }
273
274    /// Called when signaling connection is lost. Actor is offline.
275    async fn on_signaling_disconnected<C: Context>(&self, _ctx: &C) {
276        tracing::warn!("signaling disconnected");
277    }
278
279    // ─────────────────────────────────────────────────────────────────────
280    // Transport — WebSocket
281    // ─────────────────────────────────────────────────────────────────────
282
283    /// Called when a WebSocket connection attempt to a peer begins.
284    async fn on_websocket_connecting<C: Context>(&self, _ctx: &C, event: &PeerEvent) {
285        tracing::debug!(peer = %event.peer, "websocket connecting");
286    }
287
288    /// Called when a WebSocket connection to a peer is established.
289    async fn on_websocket_connected<C: Context>(&self, _ctx: &C, event: &PeerEvent) {
290        tracing::info!(peer = %event.peer, "websocket connected");
291    }
292
293    /// Called when a WebSocket connection to a peer is lost.
294    async fn on_websocket_disconnected<C: Context>(&self, _ctx: &C, event: &PeerEvent) {
295        tracing::warn!(peer = %event.peer, "websocket disconnected");
296    }
297
298    // ─────────────────────────────────────────────────────────────────────
299    // Transport — WebRTC P2P
300    // ─────────────────────────────────────────────────────────────────────
301
302    /// Called when a WebRTC P2P connection attempt to a peer begins.
303    async fn on_webrtc_connecting<C: Context>(&self, _ctx: &C, event: &PeerEvent) {
304        tracing::debug!(peer = %event.peer, "webrtc connecting");
305    }
306
307    /// Called when a WebRTC P2P connection to a peer is established.
308    ///
309    /// `event.relayed` carries whether the selected ICE candidate pair
310    /// traverses a TURN relay (`Some(true)`) or is a direct P2P connection
311    /// (`Some(false)`).
312    async fn on_webrtc_connected<C: Context>(&self, _ctx: &C, event: &PeerEvent) {
313        tracing::info!(
314            peer = %event.peer,
315            relayed = ?event.relayed,
316            "webrtc connected",
317        );
318    }
319
320    /// Called when a WebRTC P2P connection to a peer is lost.
321    async fn on_webrtc_disconnected<C: Context>(&self, _ctx: &C, event: &PeerEvent) {
322        tracing::warn!(peer = %event.peer, "webrtc disconnected");
323    }
324
325    // ─────────────────────────────────────────────────────────────────────
326    // Credential
327    // ─────────────────────────────────────────────────────────────────────
328
329    /// Called when the current credential is renewed (initial registration
330    /// or subsequent refresh).
331    async fn on_credential_renewed<C: Context>(&self, _ctx: &C, event: &CredentialEvent) {
332        tracing::info!(new_expiry = ?event.new_expiry, "credential renewed");
333    }
334
335    /// Called when the active credential is approaching its expiry.
336    ///
337    /// The warning lead time is controlled by
338    /// `HyperConfig::credential_expiry_warning` on the hyper layer.
339    async fn on_credential_expiring<C: Context>(&self, _ctx: &C, event: &CredentialEvent) {
340        tracing::warn!(
341            new_expiry = ?event.new_expiry,
342            "credential expiring soon",
343        );
344    }
345
346    // ─────────────────────────────────────────────────────────────────────
347    // Mailbox
348    // ─────────────────────────────────────────────────────────────────────
349
350    /// Called once per incident when the persistent mailbox queue length
351    /// crosses the configured backpressure threshold.
352    ///
353    /// Fires once per cross (rising edge); the framework resets the
354    /// triggered flag once the queue falls below the threshold again.
355    async fn on_mailbox_backpressure<C: Context>(&self, _ctx: &C, event: &BackpressureEvent) {
356        tracing::warn!(
357            queue_len = event.queue_len,
358            threshold = event.threshold,
359            "mailbox backpressure",
360        );
361    }
362}