Skip to main content

liminal/channel/
types.rs

1//! Public channel surface: [`ChannelConfig`], [`ChannelMode`], and the
2//! cloneable [`ChannelHandle`] that drives a REAL supervised beamr channel
3//! actor (LIM-002).
4//!
5//! The handle is a thin, synchronous-looking facade over the process-backed
6//! actor: every operation enqueues a typed command onto the actor's mailbox and
7//! blocks on a per-command reply (the haematite `ShardHandle` pattern). It owns
8//! no subscriber state and performs no fan-out itself — the actor process does.
9//!
10//! `ChannelHandle::new` stays infallible (existing call-sites depend on it): the
11//! actor is spawned lazily on first use and the spawn result is memoised, so a
12//! scheduler failure surfaces as a `LiminalError` from the first operation
13//! rather than as a panic.
14
15use std::sync::atomic::AtomicU32;
16use std::sync::{Arc, Mutex, OnceLock};
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use serde_json::Value;
20
21use crate::causal::CausalContext;
22use crate::channel::actor::{ChannelActorCore, predicate_from};
23use crate::channel::observer::ClusterObserver;
24use crate::channel::schema::{Schema, SchemaId, SchemaValidationError};
25use crate::channel::subscription::{SubscriptionHandle, SubscriptionPredicate};
26use crate::channel::supervisor::{ChannelSupervisor, shared_supervisor};
27use crate::durability::bridge::block_on;
28use crate::durability::{DurableChannel, DurableStore, MessageEnvelope};
29use crate::envelope::{Envelope, PublisherId};
30use crate::error::LiminalError;
31
32/// Single-partition count used to back a flat runtime channel with durable storage.
33const RUNTIME_DURABLE_PARTITIONS: usize = 1;
34
35/// Genuine delivery ack returned by [`ChannelHandle::publish_with_delivery`].
36///
37/// Distinct from backpressure: it reports whether a published message was
38/// actually received by a subscriber, not whether it was admitted to the bus.
39#[derive(Clone, Copy, Debug, PartialEq, Eq)]
40pub struct ChannelDelivery {
41    delivered_count: usize,
42}
43
44impl ChannelDelivery {
45    /// Number of local subscribers the message was genuinely delivered to.
46    #[must_use]
47    pub const fn delivered_count(&self) -> usize {
48        self.delivered_count
49    }
50
51    /// Whether the message was accepted by at least one subscriber.
52    #[must_use]
53    pub const fn is_delivered(&self) -> bool {
54        self.delivered_count > 0
55    }
56}
57
58/// Defines whether a channel is memory-only or durable across restarts.
59#[derive(Clone, Copy, Debug, PartialEq, Eq)]
60pub enum ChannelMode {
61    /// In-memory channel mode with no persistence overhead.
62    Ephemeral,
63    /// Durable channel mode reserved for future haematite-backed storage.
64    Durable,
65}
66
67/// Compatibility alias for the channel-owned schema definition.
68pub type SchemaRef = Schema;
69
70/// Required configuration for creating a typed channel.
71#[derive(Clone, Debug)]
72pub struct ChannelConfig {
73    /// Explicit channel name.
74    pub name: String,
75    /// Explicit schema for validating published payloads.
76    pub schema: Schema,
77    /// Explicit durability mode for the channel.
78    pub mode: ChannelMode,
79}
80
81impl ChannelConfig {
82    /// Creates channel configuration from its required fields.
83    #[must_use]
84    pub const fn new(name: String, schema: Schema, mode: ChannelMode) -> Self {
85        Self { name, schema, mode }
86    }
87}
88
89/// A lazily-spawned, supervised channel actor shared by every clone of a handle.
90///
91/// `supervisor` is stored as a `Result` so [`ChannelHandle::new`] can stay
92/// infallible: a scheduler-start failure is captured here and surfaced as a
93/// `LiminalError` the first time the actor is actually used.
94struct ChannelActorState {
95    supervisor: Result<ChannelSupervisor, String>,
96    core: OnceLock<Result<Arc<ChannelActorCore>, String>>,
97    restarts: AtomicU32,
98}
99
100impl ChannelActorState {
101    const fn new(supervisor: Result<ChannelSupervisor, String>) -> Self {
102        Self {
103            supervisor,
104            core: OnceLock::new(),
105            restarts: AtomicU32::new(0),
106        }
107    }
108
109    fn supervisor(&self) -> Result<&ChannelSupervisor, LiminalError> {
110        self.supervisor
111            .as_ref()
112            .map_err(|message| LiminalError::PublishFailed {
113                message: format!("channel supervisor unavailable: {message}"),
114            })
115    }
116
117    /// The installed cluster observer, if this channel runs on a clustered
118    /// supervisor (SRV-005). Returns `None` for non-clustered channels.
119    fn observer(&self) -> Option<Arc<dyn ClusterObserver>> {
120        self.supervisor
121            .as_ref()
122            .ok()
123            .and_then(|supervisor| supervisor.observer().cloned())
124    }
125
126    /// Returns the live actor core, spawning it (and any restart) on demand.
127    fn core(&self, schema: &Schema) -> Result<Arc<ChannelActorCore>, LiminalError> {
128        let supervisor = self.supervisor()?;
129        let stored = self.core.get_or_init(|| {
130            supervisor
131                .spawn_channel(schema.clone())
132                .map_err(|error| error.to_string())
133        });
134        let core = stored
135            .as_ref()
136            .map_err(|message| LiminalError::PublishFailed {
137                message: format!("channel actor unavailable: {message}"),
138            })?;
139        // Restart on a dead pid (R4) before returning the core for use.
140        supervisor.ensure_running(core, &self.restarts)?;
141        Ok(Arc::clone(core))
142    }
143}
144
145impl std::fmt::Debug for ChannelActorState {
146    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        formatter
148            .debug_struct("ChannelActorState")
149            .field("supervisor", &self.supervisor)
150            .finish_non_exhaustive()
151    }
152}
153
154/// Cloneable handle for interacting with a channel actor process.
155#[derive(Clone, Debug)]
156pub struct ChannelHandle {
157    config: ChannelConfig,
158    actor: Arc<ChannelActorState>,
159    durable: Option<Arc<Mutex<DurableChannel>>>,
160}
161
162impl ChannelHandle {
163    /// Creates an ephemeral handle backed by a real supervised channel actor on
164    /// the shared default supervisor.
165    ///
166    /// The actor process is spawned lazily on first use; a scheduler failure is
167    /// surfaced as a [`LiminalError`] from the first operation, not a panic.
168    #[must_use]
169    pub fn new(config: ChannelConfig) -> Self {
170        let supervisor = shared_supervisor().map_err(|error| error.to_string());
171        Self {
172            config,
173            actor: Arc::new(ChannelActorState::new(supervisor)),
174            durable: None,
175        }
176    }
177
178    /// Creates an ephemeral handle bound to an explicit `supervisor` (isolation
179    /// for the registry and tests).
180    #[must_use]
181    pub fn with_supervisor(config: ChannelConfig, supervisor: ChannelSupervisor) -> Self {
182        Self {
183            config,
184            actor: Arc::new(ChannelActorState::new(Ok(supervisor))),
185            durable: None,
186        }
187    }
188
189    /// Creates a durable handle that persists every accepted publish to `store`
190    /// before fanning it out to subscribers.
191    ///
192    /// # Errors
193    ///
194    /// Returns [`LiminalError::PublishFailed`] when the durable channel cannot be
195    /// initialized over `store`.
196    pub fn new_durable(
197        config: ChannelConfig,
198        store: Arc<dyn DurableStore>,
199    ) -> Result<Self, LiminalError> {
200        let durable = DurableChannel::new(config.name.clone(), RUNTIME_DURABLE_PARTITIONS, store)
201            .map_err(|error| LiminalError::PublishFailed {
202            message: format!(
203                "failed to initialize durable channel '{}': {error}",
204                config.name
205            ),
206        })?;
207        let supervisor = shared_supervisor()?;
208        Ok(Self {
209            config,
210            actor: Arc::new(ChannelActorState::new(Ok(supervisor))),
211            durable: Some(Arc::new(Mutex::new(durable))),
212        })
213    }
214
215    /// Creates a durable handle bound to an explicit `supervisor`.
216    ///
217    /// Used by the standalone server so every channel — durable or ephemeral —
218    /// shares ONE (optionally clustered) supervisor and thus one scheduler, which
219    /// is the precondition for cross-node delivery (SRV-005): a subscriber pid
220    /// joined to a channel's distributed process group must live on the same
221    /// scheduler that owns the distribution links.
222    ///
223    /// # Errors
224    ///
225    /// Returns [`LiminalError::PublishFailed`] when the durable channel cannot be
226    /// initialized over `store`.
227    pub fn new_durable_with_supervisor(
228        config: ChannelConfig,
229        store: Arc<dyn DurableStore>,
230        supervisor: ChannelSupervisor,
231    ) -> Result<Self, LiminalError> {
232        let durable = DurableChannel::new(config.name.clone(), RUNTIME_DURABLE_PARTITIONS, store)
233            .map_err(|error| LiminalError::PublishFailed {
234            message: format!(
235                "failed to initialize durable channel '{}': {error}",
236                config.name
237            ),
238        })?;
239        Ok(Self {
240            config,
241            actor: Arc::new(ChannelActorState::new(Ok(supervisor))),
242            durable: Some(Arc::new(Mutex::new(durable))),
243        })
244    }
245
246    /// Returns the channel configuration used to create this handle.
247    #[must_use]
248    pub const fn config(&self) -> &ChannelConfig {
249        &self.config
250    }
251
252    /// Publishes a payload to the channel with the default publisher identity.
253    ///
254    /// # Errors
255    ///
256    /// Returns a [`LiminalError`] when the channel cannot accept the payload or the schema rejects it.
257    pub fn publish<Payload>(&self, payload: Payload) -> Result<(), LiminalError>
258    where
259        Payload: AsRef<[u8]>,
260    {
261        self.publish_with_context(payload, PublisherId::default(), None)
262    }
263
264    /// Publishes a payload with an explicit publisher identity.
265    ///
266    /// # Errors
267    ///
268    /// Returns a [`LiminalError`] when the channel cannot accept the payload or the schema rejects it.
269    pub fn publish_from<Payload>(
270        &self,
271        publisher_id: impl Into<PublisherId>,
272        payload: Payload,
273    ) -> Result<(), LiminalError>
274    where
275        Payload: AsRef<[u8]>,
276    {
277        self.publish_with_context(payload, publisher_id.into(), None)
278    }
279
280    /// Publishes a payload with explicit publisher and causal metadata.
281    ///
282    /// # Errors
283    ///
284    /// Returns a [`LiminalError`] when the channel cannot accept the payload or the schema rejects it.
285    pub fn publish_with_context<Payload>(
286        &self,
287        payload: Payload,
288        publisher_id: PublisherId,
289        causal_context: Option<CausalContext>,
290    ) -> Result<(), LiminalError>
291    where
292        Payload: AsRef<[u8]>,
293    {
294        self.publish_with_delivery(payload, publisher_id, causal_context)
295            .map(|_delivery| ())
296    }
297
298    /// Publishes a payload and reports a genuine delivery ack.
299    ///
300    /// Returns a [`ChannelDelivery`] whose `delivered_count` is the number of
301    /// local subscribers the message was actually delivered to. A caller that
302    /// needs to know the message was ACCEPTED by a subscriber (not merely
303    /// buffered/published) inspects [`ChannelDelivery::is_delivered`]. This is the
304    /// channel-library half of the 13-L1 delivery-ack signal; the publish-without-
305    /// delivery methods stay unchanged for existing callers.
306    ///
307    /// # Errors
308    ///
309    /// Returns a [`LiminalError`] when the channel cannot accept the payload or the schema rejects it.
310    pub fn publish_with_delivery<Payload>(
311        &self,
312        payload: Payload,
313        publisher_id: PublisherId,
314        causal_context: Option<CausalContext>,
315    ) -> Result<ChannelDelivery, LiminalError>
316    where
317        Payload: AsRef<[u8]>,
318    {
319        // Durable channels persist the message to the store BEFORE acknowledging
320        // the publish (and before fanning out): a published message that was not
321        // durably recorded would be lost on shutdown, which CN7 forbids.
322        if let Some(durable) = self.durable.as_ref() {
323            self.persist_durable(durable, payload.as_ref(), &publisher_id)?;
324        }
325        let core = self.core()?;
326        let outcome = core.publish(payload.as_ref().to_vec(), publisher_id, causal_context)?;
327        // SRV-005: hand the normalised envelope to the cluster observer so it can
328        // fan the message out to remote subscribers. Local fan-out already
329        // happened inside `core.publish`; this is purely the cross-node leg and
330        // is a no-op when no observer is installed (non-clustered channels).
331        if let Some(observer) = self.actor.observer() {
332            observer.on_publish(&self.config.name, &outcome.envelope);
333        }
334        Ok(ChannelDelivery {
335            delivered_count: outcome.delivered_count,
336        })
337    }
338
339    fn persist_durable(
340        &self,
341        durable: &Arc<Mutex<DurableChannel>>,
342        payload: &[u8],
343        publisher_id: &PublisherId,
344    ) -> Result<(), LiminalError> {
345        let envelope = MessageEnvelope {
346            payload: payload.to_vec(),
347            causal_context: None,
348            timestamp: now_millis(),
349            publisher_id: publisher_id.as_str().to_owned(),
350            idempotency_key: None,
351        };
352        let publish_result = {
353            let mut channel = durable
354                .lock()
355                .map_err(|error| LiminalError::PublishFailed {
356                    message: format!("durable channel state unavailable: {error}"),
357                })?;
358            block_on(channel.publish(&envelope))
359        };
360        publish_result
361            .map_err(|error| LiminalError::PublishFailed {
362                message: format!(
363                    "durable publish bridge for channel '{}' failed: {error}",
364                    self.config.name
365                ),
366            })?
367            .map_err(|error| LiminalError::PublishFailed {
368                message: format!(
369                    "durable publish to channel '{}' failed: {error}",
370                    self.config.name
371                ),
372            })?;
373        Ok(())
374    }
375
376    /// Returns the schema version currently owned by the channel actor.
377    ///
378    /// # Errors
379    ///
380    /// Returns a [`LiminalError`] when the channel actor cannot be read.
381    pub fn current_schema_id(&self) -> Result<SchemaId, LiminalError> {
382        self.core()?.schema_id()
383    }
384
385    /// Evolves the channel schema by adding a defaulted field without disconnecting subscribers.
386    ///
387    /// # Errors
388    ///
389    /// Returns [`SchemaValidationError`] when the schema cannot be evolved.
390    pub fn evolve_schema_add_field(
391        &self,
392        name: impl Into<String>,
393        field_schema: Value,
394        default: Value,
395    ) -> Result<SchemaId, SchemaValidationError> {
396        let core = self
397            .core()
398            .map_err(|error| SchemaValidationError::InvalidSchema {
399                message: error.to_string(),
400            })?;
401        core.evolve(name.into(), field_schema, default)
402    }
403
404    /// Subscribes to the channel, receiving every published message.
405    ///
406    /// # Errors
407    ///
408    /// Returns a [`LiminalError`] when a subscription cannot be created.
409    pub fn subscribe(&self) -> Result<SubscriptionHandle, LiminalError> {
410        self.subscribe_inner(None)
411    }
412
413    /// Subscribes with a delivery predicate: only messages for which `predicate`
414    /// returns `true` are delivered to this subscriber. The predicate is owned
415    /// and evaluated by the actor process (R3).
416    ///
417    /// # Clustering
418    ///
419    /// The predicate filters **local-node publishes only**. Under clustering
420    /// (SRV-005), messages published on a remote node are delivered to this
421    /// subscriber *ungated* — the predicate is a non-serializable closure and is
422    /// not propagated across the wire, so remote nodes cannot evaluate it. If you
423    /// need filtering to hold for cross-node traffic, filter again on receipt
424    /// rather than relying on this predicate alone.
425    ///
426    /// # Errors
427    ///
428    /// Returns a [`LiminalError`] when a subscription cannot be created.
429    pub fn subscribe_filtered<F>(&self, predicate: F) -> Result<SubscriptionHandle, LiminalError>
430    where
431        F: Fn(&Envelope) -> bool + Send + Sync + 'static,
432    {
433        self.subscribe_inner(Some(predicate_from(predicate)))
434    }
435
436    fn subscribe_inner(
437        &self,
438        predicate: Option<SubscriptionPredicate>,
439    ) -> Result<SubscriptionHandle, LiminalError> {
440        let core = self.core()?;
441        let (handle, registration) = SubscriptionHandle::spawn(core.scheduler(), predicate)?;
442        let pid = registration.pid();
443        core.subscribe(registration)?;
444        // SRV-005: tell the cluster a local subscriber joined this channel so it
445        // can advertise the subscription to peers via its process group.
446        if let Some(observer) = self.actor.observer() {
447            observer.on_subscribe(&self.config.name, pid);
448        }
449        Ok(handle)
450    }
451
452    /// Unsubscribes the subscriber owning `subscription` by its process pid.
453    ///
454    /// # Errors
455    ///
456    /// Returns a [`LiminalError`] when the unsubscribe command fails.
457    pub fn unsubscribe(&self, subscription: &SubscriptionHandle) -> Result<(), LiminalError> {
458        let pid = subscription.pid();
459        self.core()?.unsubscribe(pid)?;
460        // SRV-005: tell the cluster the local subscriber left so it can withdraw
461        // the subscription from its process group.
462        if let Some(observer) = self.actor.observer() {
463            observer.on_unsubscribe(&self.config.name, pid);
464        }
465        Ok(())
466    }
467
468    /// Flushes buffered durable channel state to the backing store before shutdown.
469    ///
470    /// # Errors
471    ///
472    /// Returns a [`LiminalError`] when the channel actor cannot be inspected or
473    /// when the durable store flush fails.
474    pub fn flush(&self) -> Result<(), LiminalError> {
475        // Confirm the actor is reachable (and restart it if needed) before flush.
476        drop(self.core()?);
477        let Some(durable) = self.durable.as_ref() else {
478            return Ok(());
479        };
480        let flush_result = {
481            let channel = durable
482                .lock()
483                .map_err(|error| LiminalError::PublishFailed {
484                    message: format!("durable channel state unavailable: {error}"),
485                })?;
486            block_on(channel.flush_store())
487        };
488        flush_result
489            .map_err(|error| LiminalError::PublishFailed {
490                message: format!(
491                    "durable flush bridge for channel '{}' failed: {error}",
492                    self.config.name
493                ),
494            })?
495            .map_err(|error| LiminalError::PublishFailed {
496                message: format!(
497                    "durable flush for channel '{}' failed: {error}",
498                    self.config.name
499                ),
500            })?;
501        Ok(())
502    }
503
504    /// Returns the number of currently-active subscribers on the channel actor.
505    ///
506    /// # Errors
507    ///
508    /// Returns a [`LiminalError`] when the actor cannot service the query.
509    pub fn subscriber_count(&self) -> Result<usize, LiminalError> {
510        Ok(self.core()?.list_subscribers()?.len())
511    }
512
513    /// Closes the channel gracefully, stopping the actor process.
514    ///
515    /// # Errors
516    ///
517    /// Returns a [`LiminalError`] when the channel cannot be shut down.
518    pub fn close(&self) -> Result<(), LiminalError> {
519        self.core()?.close()
520    }
521
522    fn core(&self) -> Result<Arc<ChannelActorCore>, LiminalError> {
523        self.actor.core(&self.config.schema)
524    }
525
526    /// The channel actor's current beamr pid, ensuring it is running first.
527    /// Test-only: lets restart tests crash the exact actor process.
528    #[cfg(test)]
529    pub(crate) fn actor_pid(&self) -> Result<u64, LiminalError> {
530        let core = self.core()?;
531        core.current_pid()?
532            .ok_or_else(|| LiminalError::DeliveryFailed {
533                message: "channel actor has no live pid".to_owned(),
534            })
535    }
536
537    /// The scheduler the channel actor and its subscribers run on (test-only).
538    #[cfg(test)]
539    pub(crate) fn scheduler(&self) -> Result<Arc<beamr::scheduler::Scheduler>, LiminalError> {
540        Ok(Arc::clone(self.core()?.scheduler()))
541    }
542}
543
544/// Returns the current epoch milliseconds, saturating to zero before the epoch.
545fn now_millis() -> u64 {
546    SystemTime::now()
547        .duration_since(UNIX_EPOCH)
548        .map_or(0, |duration| {
549            u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
550        })
551}