actr_framework/context.rs
1//! Context trait - Execution context interface for actors
2
3use actr_protocol::{ActorResult, ActrId, ActrType, DataStream, PayloadType};
4use async_trait::async_trait;
5use futures_util::future::BoxFuture;
6
7// ── MaybeSendSync marker ────────────────────────────────────────────────
8//
9// Auto-trait-style marker that is `Send + Sync` on native targets and empty
10// on `wasm32`. Per Option U γ-unified §3.1 the user-facing `Context` bound
11// is `Clone + 'static`, but native runtime layers (tokio multi-thread,
12// `WorkloadHookObserver`) must produce `Send` futures; pinning `Send + Sync`
13// behind this marker lets the framework trait carry a single `?Send`
14// definition while silently re-asserting the native auto traits through a
15// cfg-gated blanket impl.
16
17/// Auto-trait-style marker — `Send + Sync` on native, empty on `wasm32`.
18///
19/// Used as a supertrait on `Context`, `Workload`, and `MessageDispatcher`
20/// so `async_trait` default bodies compile in both modes without adding
21/// explicit `Send` / `Sync` bounds to the user-visible trait definition.
22#[cfg(not(target_arch = "wasm32"))]
23pub trait MaybeSendSync: Send + Sync {}
24#[cfg(not(target_arch = "wasm32"))]
25impl<T: Send + Sync + ?Sized> MaybeSendSync for T {}
26
27/// Auto-trait-style marker — `Send + Sync` on native, empty on `wasm32`.
28#[cfg(target_arch = "wasm32")]
29pub trait MaybeSendSync {}
30#[cfg(target_arch = "wasm32")]
31impl<T: ?Sized> MaybeSendSync for T {}
32
33/// Actor execution context interface.
34///
35/// Defines the complete interface for an actor to interact with the runtime:
36/// - Context data access (`self_id`, `request_id`, …)
37/// - Communication primitives (`call`, `tell`)
38///
39/// # Design principles
40///
41/// - **Interface only**: the framework provides no implementation; the runtime
42/// does.
43/// - **Generic parameter**: user code accepts `<C: Context>` rather than
44/// `&dyn Context`, so dispatch monomorphises and avoids vtables.
45///
46/// # cfg dispatch
47///
48/// The trait is `?Send` on `wasm32` (browser single-threaded, futures can
49/// legitimately not be `Send`) and `#[async_trait]` (Send mode) on native
50/// targets so tokio-multi-thread spawners downstream keep working. The
51/// `MaybeSendSync` supertrait adds `Send + Sync` on native only — per
52/// Option U γ-unified §3.1 the user-visible bound stays `Clone + 'static`
53/// while `Workload` default method bodies (which need `Send` futures under
54/// the native `async_trait`) compile without any extra constraint on the
55/// generic `C`.
56///
57/// # Example
58///
59/// ```rust,ignore
60/// async fn my_handler<C: Context>(ctx: &C) {
61/// let id = ctx.self_id();
62/// let response = ctx.call(&target, request).await?;
63/// }
64/// ```
65#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
66#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
67pub trait Context: Clone + MaybeSendSync + 'static {
68 // ========== Data Access Methods ==========
69
70 /// Get the current Actor's ID
71 fn self_id(&self) -> &ActrId;
72
73 /// Get the caller's Actor ID
74 ///
75 /// - `Some(caller_id)`: Called by another Actor
76 /// - `None`: System internal call (e.g., lifecycle hooks)
77 fn caller_id(&self) -> Option<&ActrId>;
78
79 /// Get the unique request ID
80 ///
81 /// A new request_id is generated for each RPC call, used to match requests and responses.
82 fn request_id(&self) -> &str;
83
84 // ========== Communication Methods ==========
85
86 /// Send a type-safe RPC request and wait for response
87 ///
88 /// This is the primary way to call other Actors, providing full type safety guarantees.
89 ///
90 /// # Type Inference
91 ///
92 /// Response type is automatically inferred from `R::Response`, no manual annotation needed:
93 ///
94 /// ```rust,ignore
95 /// let request = EchoRequest { message: "hello".to_string() };
96 /// let response: EchoResponse = ctx.call(&target, request).await?;
97 /// // ^^^^^^^^^^^^ Inferred from EchoRequest::Response
98 /// ```
99 ///
100 /// # Error Handling
101 ///
102 /// - `ProtocolError::TransportError`: Network transport failure
103 /// - `ProtocolError::Actr(DecodeFailure)`: Response decode failure
104 /// - `ProtocolError::Actr(UnknownRoute)`: Route does not exist
105 /// - Errors returned by remote Actor's business logic
106 ///
107 /// # Parameters
108 ///
109 /// - `target`: Target destination (`Dest::Shell` for local, `Dest::Actor(id)` for remote)
110 /// - `request`: Request message implementing `RpcRequest` trait
111 ///
112 /// # Returns
113 ///
114 /// Returns response message of type `R::Response`
115 async fn call<R: actr_protocol::RpcRequest>(
116 &self,
117 target: &crate::Dest,
118 request: R,
119 ) -> ActorResult<R::Response>;
120
121 /// Send a type-safe one-way message (no response expected)
122 ///
123 /// Used for sending notifications, events, etc. that do not require a response.
124 ///
125 /// # Semantics
126 ///
127 /// - **Fire-and-forget**: Does not wait for response after sending
128 /// - **No delivery guarantee**: Message may be lost if target is unreachable
129 /// - **Low latency**: Does not block waiting for response
130 ///
131 /// # Parameters
132 ///
133 /// - `target`: Target destination (`Dest::Shell` for local, `Dest::Actor(id)` for remote)
134 /// - `message`: Message implementing `RpcRequest` trait
135 async fn tell<R: actr_protocol::RpcRequest>(
136 &self,
137 target: &crate::Dest,
138 message: R,
139 ) -> ActorResult<()>;
140
141 // ========== Fast Path: DataStream Methods ==========
142
143 /// Register a DataStream callback for a specific stream
144 ///
145 /// When a DataStream with matching stream_id arrives, the registered callback will be invoked.
146 /// Callbacks are executed concurrently and do not block other streams.
147 ///
148 /// # Parameters
149 ///
150 /// - `stream_id`: Stream identifier (must be globally unique)
151 /// - `callback`: Handler function that receives (DataStream, sender ActrId)
152 ///
153 /// # Example
154 ///
155 /// ```rust,ignore
156 /// ctx.register_stream("log-stream", |chunk, sender| {
157 /// Box::pin(async move {
158 /// println!("Received chunk {} from {:?}", chunk.sequence, sender);
159 /// Ok(())
160 /// })
161 /// }).await?;
162 /// ```
163 async fn register_stream<F>(&self, stream_id: String, callback: F) -> ActorResult<()>
164 where
165 F: Fn(DataStream, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static;
166
167 /// Unregister a DataStream callback
168 ///
169 /// # Parameters
170 ///
171 /// - `stream_id`: Stream identifier to unregister
172 async fn unregister_stream(&self, stream_id: &str) -> ActorResult<()>;
173
174 /// Send a DataStream to a destination with explicit lane selection.
175 ///
176 /// Use [`PayloadType::StreamReliable`] for ordered reliable delivery (default) or
177 /// [`PayloadType::StreamLatencyFirst`] for low-latency partial-reliable delivery.
178 ///
179 /// # Parameters
180 ///
181 /// - `target`: Target destination
182 /// - `chunk`: DataStream to send
183 /// - `payload_type`: Lane selection (`StreamReliable` or `StreamLatencyFirst`)
184 async fn send_data_stream(
185 &self,
186 target: &crate::Dest,
187 chunk: DataStream,
188 payload_type: PayloadType,
189 ) -> ActorResult<()>;
190
191 /// Discover a remote Actor of the specified type via the signaling server.
192 ///
193 /// Returns a route candidate or an error if none are available. Concrete
194 /// selection strategy is decided by the Context implementation.
195 async fn discover_route_candidate(&self, target_type: &ActrType) -> ActorResult<ActrId>;
196
197 /// Send a raw RPC request (untyped bytes) and wait for response
198 ///
199 /// This is a lower-level method for dynamic dispatch scenarios where the
200 /// request/response types are not known at compile time (e.g., FFI bindings).
201 ///
202 /// # Parameters
203 ///
204 /// - `target`: Target Actor ID
205 /// - `route_key`: Route key (e.g., "echo.EchoService/Echo")
206 /// - `payload`: Raw request payload bytes
207 ///
208 /// # Returns
209 ///
210 /// Returns raw response payload bytes
211 ///
212 /// # Example
213 ///
214 /// ```rust,ignore
215 /// // For FFI or dynamic dispatch scenarios
216 /// let response = ctx.call_raw(
217 /// &target_id,
218 /// "echo.EchoService/Echo",
219 /// request_bytes.into(),
220 /// ).await?;
221 /// ```
222 async fn call_raw(
223 &self,
224 target: &ActrId,
225 route_key: &str,
226 payload: bytes::Bytes,
227 ) -> ActorResult<bytes::Bytes>;
228
229 // ========== Fast Path: MediaTrack Methods (WebRTC Native) ==========
230
231 /// Register a WebRTC native media track callback
232 ///
233 /// When media samples arrive on the specified track, the registered callback will be invoked.
234 /// Uses WebRTC native RTCTrackRemote, no protobuf serialization overhead.
235 ///
236 /// # Parameters
237 ///
238 /// - `track_id`: Media track identifier (must match WebRTC track ID in SDP)
239 /// - `callback`: Handler function that receives native media samples
240 ///
241 /// # Example
242 ///
243 /// ```rust,ignore
244 /// use actr_framework::MediaSample;
245 ///
246 /// ctx.register_media_track("video-track-1", |sample, sender| {
247 /// Box::pin(async move {
248 /// // Decode and render video frame (native RTP payload)
249 /// println!("Received {} bytes at timestamp {}",
250 /// sample.data.len(), sample.timestamp);
251 /// decoder.decode(&sample.data).await?;
252 /// Ok(())
253 /// })
254 /// }).await?;
255 /// ```
256 ///
257 /// # Architecture Note
258 ///
259 /// MediaTrack uses WebRTC native RTP channels (RTCTrackRemote), NOT DataChannel.
260 /// This provides:
261 /// - Zero protobuf serialization overhead
262 /// - Native RTP header information (timestamp, SSRC, etc.)
263 /// - Optimal latency (~1-2ms lower than DataChannel)
264 async fn register_media_track<F>(&self, track_id: String, callback: F) -> ActorResult<()>
265 where
266 F: Fn(MediaSample, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static;
267
268 /// Unregister a media track callback
269 ///
270 /// # Parameters
271 ///
272 /// - `track_id`: Media track identifier to unregister
273 async fn unregister_media_track(&self, track_id: &str) -> ActorResult<()>;
274
275 /// Send media samples via WebRTC native track
276 ///
277 /// Sends raw media samples through WebRTC RTCRtpSender (native RTP).
278 /// This is much more efficient than sending through DataChannel.
279 ///
280 /// # Parameters
281 ///
282 /// - `target`: Target destination
283 /// - `track_id`: Track identifier
284 /// - `sample`: Media sample to send
285 ///
286 /// # Example
287 ///
288 /// ```rust,ignore
289 /// use actr_framework::{MediaSample, MediaType};
290 ///
291 /// let sample = MediaSample {
292 /// data: encoded_frame.into(),
293 /// timestamp: rtp_timestamp,
294 /// codec: "H264".to_string(),
295 /// media_type: MediaType::Video,
296 /// };
297 ///
298 /// ctx.send_media_sample(&target, "video-track-1", sample).await?;
299 /// ```
300 async fn send_media_sample(
301 &self,
302 target: &crate::Dest,
303 track_id: &str,
304 sample: MediaSample,
305 ) -> ActorResult<()>;
306
307 /// Add a media track to the WebRTC connection with the target
308 ///
309 /// Creates a new RTP track on the PeerConnection and triggers SDP renegotiation.
310 /// Must be called before `send_media_sample()` for the given track.
311 ///
312 /// # Parameters
313 ///
314 /// - `target`: Target destination
315 /// - `track_id`: Media track identifier
316 /// - `codec`: Codec name (e.g., "VP8", "H264", "OPUS")
317 /// - `media_type`: Media type ("video" or "audio")
318 async fn add_media_track(
319 &self,
320 target: &crate::Dest,
321 track_id: &str,
322 codec: &str,
323 media_type: &str,
324 ) -> ActorResult<()>;
325
326 /// Remove a media track from the WebRTC connection with the target.
327 ///
328 /// If the track exists, this removes the RTP sender from the PeerConnection
329 /// and triggers SDP renegotiation so repeated start/stop cycles do not keep
330 /// stale tracks alive on the connection.
331 async fn remove_media_track(&self, target: &crate::Dest, track_id: &str) -> ActorResult<()>;
332
333 // ========== Observation ==========
334
335 /// Emit a log record from the workload side, routed through whichever
336 /// observability pipeline the runtime installs.
337 ///
338 /// The default implementation forwards to `tracing` using the configured
339 /// `target = "actr_framework::workload"`. Runtimes that embed workloads
340 /// in environments without `tracing` (e.g. `wasm32-unknown-unknown`
341 /// running in a browser Service Worker) override this to surface records
342 /// through whatever host hook is available (`console.log`, host import, ...).
343 fn log(&self, level: LogLevel, msg: &str) {
344 match level {
345 LogLevel::Trace => tracing::trace!(target: "actr_framework::workload", "{msg}"),
346 LogLevel::Debug => tracing::debug!(target: "actr_framework::workload", "{msg}"),
347 LogLevel::Info => tracing::info!(target: "actr_framework::workload", "{msg}"),
348 LogLevel::Warn => tracing::warn!(target: "actr_framework::workload", "{msg}"),
349 LogLevel::Error => tracing::error!(target: "actr_framework::workload", "{msg}"),
350 }
351 }
352}
353
354/// Severity level for [`Context::log`].
355///
356/// Mirrors the five standard levels exposed by `tracing` / `log` so runtimes
357/// can map to whatever sink is available on the target (native `tracing`
358/// subscriber, browser `console.*`, host-import log channel, ...).
359#[derive(Debug, Clone, Copy, PartialEq, Eq)]
360pub enum LogLevel {
361 /// Very fine-grained diagnostic information, typically disabled in
362 /// production.
363 Trace,
364 /// Fine-grained information useful while debugging.
365 Debug,
366 /// Informational messages that mark normal operation.
367 Info,
368 /// Conditions that are unexpected but do not prevent continued
369 /// operation.
370 Warn,
371 /// Failures that likely require operator attention.
372 Error,
373}
374
375/// Media sample data from WebRTC native track
376///
377/// Lightweight wrapper around WebRTC native RTP sample.
378#[derive(Clone)]
379pub struct MediaSample {
380 /// Raw sample data (encoded audio/video frame)
381 pub data: bytes::Bytes,
382
383 /// Sample timestamp (from RTP timestamp)
384 pub timestamp: u32,
385
386 /// Codec-specific information
387 pub codec: String,
388
389 /// Media type (audio or video)
390 pub media_type: MediaType,
391}
392
393/// Media type enum
394#[derive(Clone, Copy, Debug, PartialEq, Eq)]
395pub enum MediaType {
396 Audio,
397 Video,
398}