Skip to main content

actr_framework/
context.rs

1//! Context trait - Execution context interface for actors
2
3use actr_protocol::{ActorResult, ActrId, ActrType, DataStream, PayloadType};
4use async_trait::async_trait;
5use futures_util::future::BoxFuture;
6
7// ── MaybeSendSync marker ────────────────────────────────────────────────
8//
9// Auto-trait-style marker that is `Send + Sync` on native targets and empty
10// on `wasm32`. Per Option U γ-unified §3.1 the user-facing `Context` bound
11// is `Clone + 'static`, but native runtime layers (tokio multi-thread,
12// `WorkloadHookObserver`) must produce `Send` futures; pinning `Send + Sync`
13// behind this marker lets the framework trait carry a single `?Send`
14// definition while silently re-asserting the native auto traits through a
15// cfg-gated blanket impl.
16
17/// Auto-trait-style marker — `Send + Sync` on native, empty on `wasm32`.
18///
19/// Used as a supertrait on `Context`, `Workload`, and `MessageDispatcher`
20/// so `async_trait` default bodies compile in both modes without adding
21/// explicit `Send` / `Sync` bounds to the user-visible trait definition.
22#[cfg(not(target_arch = "wasm32"))]
23pub trait MaybeSendSync: Send + Sync {}
24#[cfg(not(target_arch = "wasm32"))]
25impl<T: Send + Sync + ?Sized> MaybeSendSync for T {}
26
27/// Auto-trait-style marker — `Send + Sync` on native, empty on `wasm32`.
28#[cfg(target_arch = "wasm32")]
29pub trait MaybeSendSync {}
30#[cfg(target_arch = "wasm32")]
31impl<T: ?Sized> MaybeSendSync for T {}
32
33/// Actor execution context interface.
34///
35/// Defines the complete interface for an actor to interact with the runtime:
36/// - Context data access (`self_id`, `request_id`, …)
37/// - Communication primitives (`call`, `tell`)
38///
39/// # Design principles
40///
41/// - **Interface only**: the framework provides no implementation; the runtime
42///   does.
43/// - **Generic parameter**: user code accepts `<C: Context>` rather than
44///   `&dyn Context`, so dispatch monomorphises and avoids vtables.
45///
46/// # cfg dispatch
47///
48/// The trait is `?Send` on `wasm32` (browser single-threaded, futures can
49/// legitimately not be `Send`) and `#[async_trait]` (Send mode) on native
50/// targets so tokio-multi-thread spawners downstream keep working. The
51/// `MaybeSendSync` supertrait adds `Send + Sync` on native only — per
52/// Option U γ-unified §3.1 the user-visible bound stays `Clone + 'static`
53/// while `Workload` default method bodies (which need `Send` futures under
54/// the native `async_trait`) compile without any extra constraint on the
55/// generic `C`.
56///
57/// # Example
58///
59/// ```rust,ignore
60/// async fn my_handler<C: Context>(ctx: &C) {
61///     let id = ctx.self_id();
62///     let response = ctx.call(&target, request).await?;
63/// }
64/// ```
65#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
66#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
67pub trait Context: Clone + MaybeSendSync + 'static {
68    // ========== Data Access Methods ==========
69
70    /// Get the current Actor's ID
71    fn self_id(&self) -> &ActrId;
72
73    /// Get the caller's Actor ID
74    ///
75    /// - `Some(caller_id)`: Called by another Actor
76    /// - `None`: System internal call (e.g., lifecycle hooks)
77    fn caller_id(&self) -> Option<&ActrId>;
78
79    /// Get the unique request ID
80    ///
81    /// A new request_id is generated for each RPC call, used to match requests and responses.
82    fn request_id(&self) -> &str;
83
84    // ========== Communication Methods ==========
85
86    /// Send a type-safe RPC request and wait for response
87    ///
88    /// This is the primary way to call other Actors, providing full type safety guarantees.
89    ///
90    /// # Type Inference
91    ///
92    /// Response type is automatically inferred from `R::Response`, no manual annotation needed:
93    ///
94    /// ```rust,ignore
95    /// let request = EchoRequest { message: "hello".to_string() };
96    /// let response: EchoResponse = ctx.call(&target, request).await?;
97    /// //              ^^^^^^^^^^^^ Inferred from EchoRequest::Response
98    /// ```
99    ///
100    /// # Error Handling
101    ///
102    /// - `ProtocolError::TransportError`: Network transport failure
103    /// - `ProtocolError::Actr(DecodeFailure)`: Response decode failure
104    /// - `ProtocolError::Actr(UnknownRoute)`: Route does not exist
105    /// - Errors returned by remote Actor's business logic
106    ///
107    /// # Parameters
108    ///
109    /// - `target`: Target destination (`Dest::Shell` for local, `Dest::Actor(id)` for remote)
110    /// - `request`: Request message implementing `RpcRequest` trait
111    ///
112    /// # Returns
113    ///
114    /// Returns response message of type `R::Response`
115    async fn call<R: actr_protocol::RpcRequest>(
116        &self,
117        target: &crate::Dest,
118        request: R,
119    ) -> ActorResult<R::Response>;
120
121    /// Send a type-safe one-way message (no response expected)
122    ///
123    /// Used for sending notifications, events, etc. that do not require a response.
124    ///
125    /// # Semantics
126    ///
127    /// - **Fire-and-forget**: Does not wait for response after sending
128    /// - **No delivery guarantee**: Message may be lost if target is unreachable
129    /// - **Low latency**: Does not block waiting for response
130    ///
131    /// # Parameters
132    ///
133    /// - `target`: Target destination (`Dest::Shell` for local, `Dest::Actor(id)` for remote)
134    /// - `message`: Message implementing `RpcRequest` trait
135    async fn tell<R: actr_protocol::RpcRequest>(
136        &self,
137        target: &crate::Dest,
138        message: R,
139    ) -> ActorResult<()>;
140
141    // ========== Fast Path: DataStream Methods ==========
142
143    /// Register a DataStream callback for a specific stream
144    ///
145    /// When a DataStream with matching stream_id arrives, the registered callback will be invoked.
146    /// Callbacks are executed concurrently and do not block other streams.
147    ///
148    /// # Parameters
149    ///
150    /// - `stream_id`: Stream identifier (must be globally unique)
151    /// - `callback`: Handler function that receives (DataStream, sender ActrId)
152    ///
153    /// # Example
154    ///
155    /// ```rust,ignore
156    /// ctx.register_stream("log-stream", |chunk, sender| {
157    ///     Box::pin(async move {
158    ///         println!("Received chunk {} from {:?}", chunk.sequence, sender);
159    ///         Ok(())
160    ///     })
161    /// }).await?;
162    /// ```
163    async fn register_stream<F>(&self, stream_id: String, callback: F) -> ActorResult<()>
164    where
165        F: Fn(DataStream, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static;
166
167    /// Unregister a DataStream callback
168    ///
169    /// # Parameters
170    ///
171    /// - `stream_id`: Stream identifier to unregister
172    async fn unregister_stream(&self, stream_id: &str) -> ActorResult<()>;
173
174    /// Send a DataStream to a destination with explicit lane selection.
175    ///
176    /// Use [`PayloadType::StreamReliable`] for ordered reliable delivery (default) or
177    /// [`PayloadType::StreamLatencyFirst`] for low-latency partial-reliable delivery.
178    ///
179    /// # Parameters
180    ///
181    /// - `target`: Target destination
182    /// - `chunk`: DataStream to send
183    /// - `payload_type`: Lane selection (`StreamReliable` or `StreamLatencyFirst`)
184    async fn send_data_stream(
185        &self,
186        target: &crate::Dest,
187        chunk: DataStream,
188        payload_type: PayloadType,
189    ) -> ActorResult<()>;
190
191    /// Discover a remote Actor of the specified type via the signaling server.
192    ///
193    /// Returns a route candidate or an error if none are available. Concrete
194    /// selection strategy is decided by the Context implementation.
195    async fn discover_route_candidate(&self, target_type: &ActrType) -> ActorResult<ActrId>;
196
197    /// Send a raw RPC request (untyped bytes) and wait for response
198    ///
199    /// This is a lower-level method for dynamic dispatch scenarios where the
200    /// request/response types are not known at compile time (e.g., FFI bindings).
201    ///
202    /// # Parameters
203    ///
204    /// - `target`: Target Actor ID
205    /// - `route_key`: Route key (e.g., "echo.EchoService/Echo")
206    /// - `payload`: Raw request payload bytes
207    ///
208    /// # Returns
209    ///
210    /// Returns raw response payload bytes
211    ///
212    /// # Example
213    ///
214    /// ```rust,ignore
215    /// // For FFI or dynamic dispatch scenarios
216    /// let response = ctx.call_raw(
217    ///     &target_id,
218    ///     "echo.EchoService/Echo",
219    ///     request_bytes.into(),
220    /// ).await?;
221    /// ```
222    async fn call_raw(
223        &self,
224        target: &ActrId,
225        route_key: &str,
226        payload: bytes::Bytes,
227    ) -> ActorResult<bytes::Bytes>;
228
229    // ========== Fast Path: MediaTrack Methods (WebRTC Native) ==========
230
231    /// Register a WebRTC native media track callback
232    ///
233    /// When media samples arrive on the specified track, the registered callback will be invoked.
234    /// Uses WebRTC native RTCTrackRemote, no protobuf serialization overhead.
235    ///
236    /// # Parameters
237    ///
238    /// - `track_id`: Media track identifier (must match WebRTC track ID in SDP)
239    /// - `callback`: Handler function that receives native media samples
240    ///
241    /// # Example
242    ///
243    /// ```rust,ignore
244    /// use actr_framework::MediaSample;
245    ///
246    /// ctx.register_media_track("video-track-1", |sample, sender| {
247    ///     Box::pin(async move {
248    ///         // Decode and render video frame (native RTP payload)
249    ///         println!("Received {} bytes at timestamp {}",
250    ///                  sample.data.len(), sample.timestamp);
251    ///         decoder.decode(&sample.data).await?;
252    ///         Ok(())
253    ///     })
254    /// }).await?;
255    /// ```
256    ///
257    /// # Architecture Note
258    ///
259    /// MediaTrack uses WebRTC native RTP channels (RTCTrackRemote), NOT DataChannel.
260    /// This provides:
261    /// - Zero protobuf serialization overhead
262    /// - Native RTP header information (timestamp, SSRC, etc.)
263    /// - Optimal latency (~1-2ms lower than DataChannel)
264    async fn register_media_track<F>(&self, track_id: String, callback: F) -> ActorResult<()>
265    where
266        F: Fn(MediaSample, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static;
267
268    /// Unregister a media track callback
269    ///
270    /// # Parameters
271    ///
272    /// - `track_id`: Media track identifier to unregister
273    async fn unregister_media_track(&self, track_id: &str) -> ActorResult<()>;
274
275    /// Send media samples via WebRTC native track
276    ///
277    /// Sends raw media samples through WebRTC RTCRtpSender (native RTP).
278    /// This is much more efficient than sending through DataChannel.
279    ///
280    /// # Parameters
281    ///
282    /// - `target`: Target destination
283    /// - `track_id`: Track identifier
284    /// - `sample`: Media sample to send
285    ///
286    /// # Example
287    ///
288    /// ```rust,ignore
289    /// use actr_framework::{MediaSample, MediaType};
290    ///
291    /// let sample = MediaSample {
292    ///     data: encoded_frame.into(),
293    ///     timestamp: rtp_timestamp,
294    ///     codec: "H264".to_string(),
295    ///     media_type: MediaType::Video,
296    /// };
297    ///
298    /// ctx.send_media_sample(&target, "video-track-1", sample).await?;
299    /// ```
300    async fn send_media_sample(
301        &self,
302        target: &crate::Dest,
303        track_id: &str,
304        sample: MediaSample,
305    ) -> ActorResult<()>;
306
307    /// Add a media track to the WebRTC connection with the target
308    ///
309    /// Creates a new RTP track on the PeerConnection and triggers SDP renegotiation.
310    /// Must be called before `send_media_sample()` for the given track.
311    ///
312    /// # Parameters
313    ///
314    /// - `target`: Target destination
315    /// - `track_id`: Media track identifier
316    /// - `codec`: Codec name (e.g., "VP8", "H264", "OPUS")
317    /// - `media_type`: Media type ("video" or "audio")
318    async fn add_media_track(
319        &self,
320        target: &crate::Dest,
321        track_id: &str,
322        codec: &str,
323        media_type: &str,
324    ) -> ActorResult<()>;
325
326    /// Remove a media track from the WebRTC connection with the target.
327    ///
328    /// If the track exists, this removes the RTP sender from the PeerConnection
329    /// and triggers SDP renegotiation so repeated start/stop cycles do not keep
330    /// stale tracks alive on the connection.
331    async fn remove_media_track(&self, target: &crate::Dest, track_id: &str) -> ActorResult<()>;
332
333    // ========== Observation ==========
334
335    /// Emit a log record from the workload side, routed through whichever
336    /// observability pipeline the runtime installs.
337    ///
338    /// The default implementation forwards to `tracing` using the configured
339    /// `target = "actr_framework::workload"`. Runtimes that embed workloads
340    /// in environments without `tracing` (e.g. `wasm32-unknown-unknown`
341    /// running in a browser Service Worker) override this to surface records
342    /// through whatever host hook is available (`console.log`, host import, ...).
343    fn log(&self, level: LogLevel, msg: &str) {
344        match level {
345            LogLevel::Trace => tracing::trace!(target: "actr_framework::workload", "{msg}"),
346            LogLevel::Debug => tracing::debug!(target: "actr_framework::workload", "{msg}"),
347            LogLevel::Info => tracing::info!(target: "actr_framework::workload", "{msg}"),
348            LogLevel::Warn => tracing::warn!(target: "actr_framework::workload", "{msg}"),
349            LogLevel::Error => tracing::error!(target: "actr_framework::workload", "{msg}"),
350        }
351    }
352}
353
354/// Severity level for [`Context::log`].
355///
356/// Mirrors the five standard levels exposed by `tracing` / `log` so runtimes
357/// can map to whatever sink is available on the target (native `tracing`
358/// subscriber, browser `console.*`, host-import log channel, ...).
359#[derive(Debug, Clone, Copy, PartialEq, Eq)]
360pub enum LogLevel {
361    /// Very fine-grained diagnostic information, typically disabled in
362    /// production.
363    Trace,
364    /// Fine-grained information useful while debugging.
365    Debug,
366    /// Informational messages that mark normal operation.
367    Info,
368    /// Conditions that are unexpected but do not prevent continued
369    /// operation.
370    Warn,
371    /// Failures that likely require operator attention.
372    Error,
373}
374
375/// Media sample data from WebRTC native track
376///
377/// Lightweight wrapper around WebRTC native RTP sample.
378#[derive(Clone)]
379pub struct MediaSample {
380    /// Raw sample data (encoded audio/video frame)
381    pub data: bytes::Bytes,
382
383    /// Sample timestamp (from RTP timestamp)
384    pub timestamp: u32,
385
386    /// Codec-specific information
387    pub codec: String,
388
389    /// Media type (audio or video)
390    pub media_type: MediaType,
391}
392
393/// Media type enum
394#[derive(Clone, Copy, Debug, PartialEq, Eq)]
395pub enum MediaType {
396    Audio,
397    Video,
398}