actr_framework/workload.rs
1//! Workload trait - Executable actor workload
2//!
3//! Defines the user-facing programming interface: each logical Actor
4//! implements (or has generated for it) a [`Workload`] that associates a
5//! [`MessageDispatcher`] and exposes a fixed set of observation hooks the
6//! framework fires over the lifetime of the actor node.
7
8use std::time::SystemTime;
9
10use actr_protocol::{ActorResult, ActrError, ActrId};
11use async_trait::async_trait;
12
13use crate::{Context, MessageDispatcher};
14
15// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
16// Event payloads
17// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
18
19/// Peer-scoped event payload for transport hooks.
20///
21/// Public WebRTC peer status exposed through observation hooks.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum WebRtcPeerStatus {
24 Idle,
25 Connecting,
26 Connected,
27 Recovering,
28}
29
30/// Used by WebSocket and WebRTC hook callbacks to identify the remote peer
31/// involved in the state change. For WebRTC, `relayed` reports whether the
32/// selected ICE candidate pair traversed a TURN relay, and `status` reports
33/// the coarse send-readiness state. For WebSocket both fields are always
34/// `None` (not applicable).
35#[derive(Debug, Clone)]
36pub struct PeerEvent {
37 /// Remote peer identity.
38 pub peer: ActrId,
39 /// `Some(true)` if the WebRTC connection is TURN-relayed, `Some(false)` for
40 /// a direct peer-to-peer connection. Always `None` for WebSocket events.
41 pub relayed: Option<bool>,
42 /// Coarse WebRTC send-readiness state. Always `None` for WebSocket events.
43 pub status: Option<WebRtcPeerStatus>,
44}
45
46/// Error event payload passed to [`Workload::on_error`].
47///
48/// Wraps the structured cause ([`ActrError`]), a coarse domain
49/// classification, free-form context describing where the error happened,
50/// and a wall-clock timestamp captured at the reporting site.
51#[derive(Debug, Clone)]
52pub struct ErrorEvent {
53 /// Underlying protocol error.
54 pub source: ActrError,
55 /// Coarse category to aid routing / alerting in user handlers.
56 pub category: ErrorCategory,
57 /// Free-form human-readable context (route key, handler name, stage, ...).
58 pub context: String,
59 /// Wall-clock timestamp captured when the event was emitted.
60 pub timestamp: SystemTime,
61}
62
63impl ErrorEvent {
64 /// Convenience constructor: stamp `timestamp` with `SystemTime::now()`.
65 pub fn now(source: ActrError, category: ErrorCategory, context: impl Into<String>) -> Self {
66 Self {
67 source,
68 category,
69 context: context.into(),
70 timestamp: SystemTime::now(),
71 }
72 }
73}
74
75/// Coarse fault-domain classification for [`ErrorEvent`].
76///
77/// Mirrors the top-level protocol [`actr_protocol::ErrorKind`] but is
78/// specialised for the *dispatch* boundary — callers want to distinguish
79/// "user code blew up" (`HandlerPanic` / `HandlerError`) from "the plumbing
80/// under them failed" (`SignalingFailure` / `TransportFailure`).
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum ErrorCategory {
83 /// User handler code panicked. `source` will typically be a
84 /// [`ActrError::DecodeFailure`] wrapping the panic message.
85 HandlerPanic,
86 /// User handler returned `Err` normally. `source` is the handler's
87 /// own [`ActrError`] value.
88 HandlerError,
89 /// Signaling layer failure (AIS registration, reconnect, credential
90 /// verification).
91 SignalingFailure,
92 /// Transport layer failure: WebSocket / WebRTC connection errors, lane
93 /// / mpsc plumbing faults.
94 TransportFailure,
95 /// A `send_data_stream` was active when the WebRTC/DataChannel path was
96 /// interrupted. Delivery is uncertain; the framework has not confirmed
97 /// loss or performed resume.
98 DataStreamDeliveryUncertain,
99}
100
101/// Credential lifecycle event.
102///
103/// Fired on initial registration and any subsequent renewal (via
104/// [`Workload::on_credential_renewed`]), and also used to warn when an
105/// active credential is approaching its expiry (via
106/// [`Workload::on_credential_expiring`]).
107#[derive(Debug, Clone)]
108pub struct CredentialEvent {
109 /// Absolute expiry time of the credential that triggered the event.
110 pub new_expiry: SystemTime,
111}
112
113/// Mailbox backpressure event.
114///
115/// Fires once per "mailbox pressure crossed configured threshold" incident.
116/// Use [`HyperConfig::mailbox_backpressure_threshold`] (see `actr-hyper`) to
117/// tune the trip level.
118#[derive(Debug, Clone, Copy)]
119pub struct BackpressureEvent {
120 /// Current queued-message count at the time of the sample.
121 pub queue_len: usize,
122 /// Threshold that was crossed.
123 pub threshold: usize,
124}
125
126// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
127// Workload trait
128// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
129
130/// Workload — Executable Actor workload
131///
132/// Represents a complete Actor instance, composed of:
133/// - an associated [`MessageDispatcher`] ([`Workload::Dispatcher`]) that
134/// knows how to decode and route incoming RPC envelopes;
135/// - sixteen observation hooks grouped by category (see below), each with a
136/// meaningful default that emits a `tracing` record so unmodified
137/// workloads produce useful operational logs out of the box.
138///
139/// # Design
140///
141/// - **Bidirectional association**: [`Workload::Dispatcher`] and
142/// [`MessageDispatcher::Workload`] refer to each other so that generated
143/// dispatchers can access the user-implemented workload type.
144/// - **Meaningful defaults**: every hook has a `tracing`-emitting default
145/// (`info` / `debug` / `warn` / `error` levels tuned per hook). Users
146/// override only the hooks whose behaviour they want to extend.
147/// - **Auto-implementation**: the code generator emits
148/// `impl<T: Handler> Workload for Wrapper<T> { type Dispatcher = Router<T>; }`
149/// which inherits every default implementation. Generated wrappers thus
150/// get 16 hooks "for free".
151///
152/// # Hook categories
153///
154/// Hooks are organised into six categories. Override only the hooks you need.
155///
156/// ## Lifecycle (4) — fallible
157/// - [`Workload::on_start`] — node started, before accepting requests
158/// - [`Workload::on_ready`] — node registered and ready to accept requests
159/// - [`Workload::on_stop`] — shutdown signal received
160/// - [`Workload::on_error`] — framework caught a runtime error
161///
162/// Lifecycle hooks return [`ActorResult`]; errors propagate and abort startup
163/// (for `on_start`) or are logged by the framework (for the others).
164///
165/// ## Signaling (3) — infallible
166/// - [`Workload::on_signaling_connecting`]
167/// - [`Workload::on_signaling_connected`]
168/// - [`Workload::on_signaling_disconnected`]
169///
170/// ## Transport — WebSocket (3) — infallible
171/// - [`Workload::on_websocket_connecting`]
172/// - [`Workload::on_websocket_connected`]
173/// - [`Workload::on_websocket_disconnected`]
174///
175/// ## Transport — WebRTC P2P (3) — infallible
176/// - [`Workload::on_webrtc_connecting`]
177/// - [`Workload::on_webrtc_connected`] — includes `relayed` info via
178/// [`PeerEvent::relayed`]
179/// - [`Workload::on_webrtc_disconnected`]
180///
181/// ## Credential (2) — infallible
182/// - [`Workload::on_credential_renewed`]
183/// - [`Workload::on_credential_expiring`]
184///
185/// ## Mailbox (1) — infallible
186/// - [`Workload::on_mailbox_backpressure`]
187///
188/// # Code Generation Example
189///
190/// ```rust,ignore
191/// // User-implemented Handler
192/// pub struct MyEchoService { /* ... */ }
193///
194/// impl EchoServiceHandler for MyEchoService {
195/// async fn echo<C: Context>(
196/// &self,
197/// req: EchoRequest,
198/// ctx: &C,
199/// ) -> ActorResult<EchoResponse> {
200/// Ok(EchoResponse { reply: format!("Echo: {}", req.message) })
201/// }
202/// }
203///
204/// // Code-generated Workload wrapper — inherits all 16 defaults.
205/// pub struct EchoServiceWorkload<T: EchoServiceHandler>(pub T);
206///
207/// impl<T: EchoServiceHandler> Workload for EchoServiceWorkload<T> {
208/// type Dispatcher = EchoServiceRouter<T>;
209/// }
210/// ```
211// Workload trait is `?Send` on `wasm32` (browser single-threaded) and keeps
212// the native default `Send` auto trait elsewhere so tokio-multi-thread-backed
213// adapters on the native side continue to produce `Send` futures without
214// fighting `async_trait`. Per Option U γ-unified §3.2 the user-facing bound
215// is `'static`; `MaybeSendSync` silently re-adds `Send + Sync` on native so
216// the lifecycle-hook default bodies and the hyper `WorkloadAdapter`
217// downstream see the `Send` futures they require.
218#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
219#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
220pub trait Workload: crate::MaybeSendSync + 'static {
221 /// Associated dispatcher type.
222 type Dispatcher: MessageDispatcher<Workload = Self>;
223
224 // ─────────────────────────────────────────────────────────────────────
225 // Lifecycle
226 // ─────────────────────────────────────────────────────────────────────
227
228 /// Called when the node has started.
229 ///
230 /// Use this to initialise business resources, start timers, etc. Returning
231 /// `Err` aborts node startup.
232 async fn on_start<C: Context>(&self, _ctx: &C) -> ActorResult<()> {
233 tracing::info!("workload on_start");
234 Ok(())
235 }
236
237 /// Called when signaling is connected and registration is complete: the
238 /// node is now discoverable and may serve requests.
239 async fn on_ready<C: Context>(&self, _ctx: &C) -> ActorResult<()> {
240 tracing::info!("workload on_ready (node ready to accept requests)");
241 Ok(())
242 }
243
244 /// Called when the node receives a shutdown signal.
245 ///
246 /// Use this to release business resources and persist state.
247 async fn on_stop<C: Context>(&self, _ctx: &C) -> ActorResult<()> {
248 tracing::info!("workload on_stop");
249 Ok(())
250 }
251
252 /// Called when the framework catches a runtime error.
253 ///
254 /// See [`ErrorEvent`] and [`ErrorCategory`] for the structure of the
255 /// argument. Use this for alerting, logging, or graceful degradation.
256 async fn on_error<C: Context>(&self, _ctx: &C, event: &ErrorEvent) -> ActorResult<()> {
257 tracing::error!(
258 category = ?event.category,
259 context = %event.context,
260 source = %event.source,
261 "workload on_error",
262 );
263 Ok(())
264 }
265
266 // ─────────────────────────────────────────────────────────────────────
267 // Signaling
268 // ─────────────────────────────────────────────────────────────────────
269
270 /// Called when signaling connection attempt begins.
271 ///
272 /// `ctx` is `None` during the initial connection (before the node has
273 /// obtained its identity) and `Some` for every subsequent reconnect.
274 async fn on_signaling_connecting<C: Context>(&self, _ctx: Option<&C>) {
275 tracing::debug!("signaling connecting");
276 }
277
278 /// Called when signaling connection is established. Actor is online.
279 ///
280 /// `ctx` is `None` during the initial connection and `Some` for every
281 /// subsequent reconnect.
282 async fn on_signaling_connected<C: Context>(&self, _ctx: Option<&C>) {
283 tracing::info!("signaling connected");
284 }
285
286 /// Called when signaling connection is lost. Actor is offline.
287 async fn on_signaling_disconnected<C: Context>(&self, _ctx: &C) {
288 tracing::warn!("signaling disconnected");
289 }
290
291 // ─────────────────────────────────────────────────────────────────────
292 // Transport — WebSocket
293 // ─────────────────────────────────────────────────────────────────────
294
295 /// Called when a WebSocket connection attempt to a peer begins.
296 async fn on_websocket_connecting<C: Context>(&self, _ctx: &C, event: &PeerEvent) {
297 tracing::debug!(peer = %event.peer, "websocket connecting");
298 }
299
300 /// Called when a WebSocket connection to a peer is established.
301 async fn on_websocket_connected<C: Context>(&self, _ctx: &C, event: &PeerEvent) {
302 tracing::info!(peer = %event.peer, "websocket connected");
303 }
304
305 /// Called when a WebSocket connection to a peer is lost.
306 async fn on_websocket_disconnected<C: Context>(&self, _ctx: &C, event: &PeerEvent) {
307 tracing::warn!(peer = %event.peer, "websocket disconnected");
308 }
309
310 // ─────────────────────────────────────────────────────────────────────
311 // Transport — WebRTC P2P
312 // ─────────────────────────────────────────────────────────────────────
313
314 /// Called when a WebRTC P2P connection attempt to a peer begins.
315 async fn on_webrtc_connecting<C: Context>(&self, _ctx: &C, event: &PeerEvent) {
316 tracing::debug!(peer = %event.peer, "webrtc connecting");
317 }
318
319 /// Called when a WebRTC P2P connection to a peer is established.
320 ///
321 /// `event.relayed` carries whether the selected ICE candidate pair
322 /// traverses a TURN relay (`Some(true)`) or is a direct P2P connection
323 /// (`Some(false)`).
324 async fn on_webrtc_connected<C: Context>(&self, _ctx: &C, event: &PeerEvent) {
325 tracing::info!(
326 peer = %event.peer,
327 relayed = ?event.relayed,
328 "webrtc connected",
329 );
330 }
331
332 /// Called when a WebRTC P2P connection to a peer is lost.
333 async fn on_webrtc_disconnected<C: Context>(&self, _ctx: &C, event: &PeerEvent) {
334 tracing::warn!(peer = %event.peer, "webrtc disconnected");
335 }
336
337 // ─────────────────────────────────────────────────────────────────────
338 // Credential
339 // ─────────────────────────────────────────────────────────────────────
340
341 /// Called when the current credential is renewed (initial registration
342 /// or subsequent refresh).
343 async fn on_credential_renewed<C: Context>(&self, _ctx: &C, event: &CredentialEvent) {
344 tracing::info!(new_expiry = ?event.new_expiry, "credential renewed");
345 }
346
347 /// Called when the active credential is approaching its expiry.
348 ///
349 /// The warning lead time is controlled by
350 /// `HyperConfig::credential_expiry_warning` on the hyper layer.
351 async fn on_credential_expiring<C: Context>(&self, _ctx: &C, event: &CredentialEvent) {
352 tracing::warn!(
353 new_expiry = ?event.new_expiry,
354 "credential expiring soon",
355 );
356 }
357
358 // ─────────────────────────────────────────────────────────────────────
359 // Mailbox
360 // ─────────────────────────────────────────────────────────────────────
361
362 /// Called once per incident when the persistent mailbox queue length
363 /// crosses the configured backpressure threshold.
364 ///
365 /// Fires once per cross (rising edge); the framework resets the
366 /// triggered flag once the queue falls below the threshold again.
367 async fn on_mailbox_backpressure<C: Context>(&self, _ctx: &C, event: &BackpressureEvent) {
368 tracing::warn!(
369 queue_len = event.queue_len,
370 threshold = event.threshold,
371 "mailbox backpressure",
372 );
373 }
374}