liminal/channel/types.rs
1//! Public channel surface: [`ChannelConfig`], [`ChannelMode`], and the
2//! cloneable [`ChannelHandle`] that drives a REAL supervised beamr channel
3//! actor (LIM-002).
4//!
5//! The handle is a thin, synchronous-looking facade over the process-backed
6//! actor: every operation enqueues a typed command onto the actor's mailbox and
7//! blocks on a per-command reply (the haematite `ShardHandle` pattern). It owns
8//! no subscriber state and performs no fan-out itself — the actor process does.
9//!
10//! `ChannelHandle::new` stays infallible (existing call-sites depend on it): the
11//! actor is spawned lazily on first use and the spawn result is memoised, so a
12//! scheduler failure surfaces as a `LiminalError` from the first operation
13//! rather than as a panic.
14
15use std::sync::atomic::AtomicU32;
16use std::sync::{Arc, Mutex, OnceLock};
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use serde_json::Value;
20
21use crate::causal::CausalContext;
22use crate::channel::actor::{ChannelActorCore, predicate_from};
23use crate::channel::observer::ClusterObserver;
24use crate::channel::schema::{Schema, SchemaId, SchemaValidationError};
25use crate::channel::subscription::{SubscriptionHandle, SubscriptionPredicate};
26use crate::channel::supervisor::{ChannelSupervisor, shared_supervisor};
27use crate::durability::bridge::block_on;
28use crate::durability::{DurableChannel, DurableStore, MessageEnvelope};
29use crate::envelope::{Envelope, PublisherId};
30use crate::error::LiminalError;
31
32/// Single-partition count used to back a flat runtime channel with durable storage.
33const RUNTIME_DURABLE_PARTITIONS: usize = 1;
34
35/// Genuine delivery ack returned by [`ChannelHandle::publish_with_delivery`].
36///
37/// Distinct from backpressure: it reports whether a published message was
38/// actually received by a subscriber, not whether it was admitted to the bus.
39#[derive(Clone, Copy, Debug, PartialEq, Eq)]
40pub struct ChannelDelivery {
41 delivered_count: usize,
42}
43
44impl ChannelDelivery {
45 /// Number of local subscribers the message was genuinely delivered to.
46 #[must_use]
47 pub const fn delivered_count(&self) -> usize {
48 self.delivered_count
49 }
50
51 /// Whether the message was accepted by at least one subscriber.
52 #[must_use]
53 pub const fn is_delivered(&self) -> bool {
54 self.delivered_count > 0
55 }
56}
57
58/// Defines whether a channel is memory-only or durable across restarts.
59#[derive(Clone, Copy, Debug, PartialEq, Eq)]
60pub enum ChannelMode {
61 /// In-memory channel mode with no persistence overhead.
62 Ephemeral,
63 /// Durable channel mode reserved for future haematite-backed storage.
64 Durable,
65}
66
67/// Compatibility alias for the channel-owned schema definition.
68pub type SchemaRef = Schema;
69
70/// Required configuration for creating a typed channel.
71#[derive(Clone, Debug)]
72pub struct ChannelConfig {
73 /// Explicit channel name.
74 pub name: String,
75 /// Explicit schema for validating published payloads.
76 pub schema: Schema,
77 /// Explicit durability mode for the channel.
78 pub mode: ChannelMode,
79}
80
81impl ChannelConfig {
82 /// Creates channel configuration from its required fields.
83 #[must_use]
84 pub const fn new(name: String, schema: Schema, mode: ChannelMode) -> Self {
85 Self { name, schema, mode }
86 }
87}
88
89/// A lazily-spawned, supervised channel actor shared by every clone of a handle.
90///
91/// `supervisor` is stored as a `Result` so [`ChannelHandle::new`] can stay
92/// infallible: a scheduler-start failure is captured here and surfaced as a
93/// `LiminalError` the first time the actor is actually used.
94struct ChannelActorState {
95 supervisor: Result<ChannelSupervisor, String>,
96 core: OnceLock<Result<Arc<ChannelActorCore>, String>>,
97 restarts: AtomicU32,
98}
99
100impl ChannelActorState {
101 const fn new(supervisor: Result<ChannelSupervisor, String>) -> Self {
102 Self {
103 supervisor,
104 core: OnceLock::new(),
105 restarts: AtomicU32::new(0),
106 }
107 }
108
109 fn supervisor(&self) -> Result<&ChannelSupervisor, LiminalError> {
110 self.supervisor
111 .as_ref()
112 .map_err(|message| LiminalError::PublishFailed {
113 message: format!("channel supervisor unavailable: {message}"),
114 })
115 }
116
117 /// The installed cluster observer, if this channel runs on a clustered
118 /// supervisor (SRV-005). Returns `None` for non-clustered channels.
119 fn observer(&self) -> Option<Arc<dyn ClusterObserver>> {
120 self.supervisor
121 .as_ref()
122 .ok()
123 .and_then(|supervisor| supervisor.observer().cloned())
124 }
125
126 /// Returns the live actor core, spawning it (and any restart) on demand.
127 fn core(&self, schema: &Schema) -> Result<Arc<ChannelActorCore>, LiminalError> {
128 let supervisor = self.supervisor()?;
129 let stored = self.core.get_or_init(|| {
130 supervisor
131 .spawn_channel(schema.clone())
132 .map_err(|error| error.to_string())
133 });
134 let core = stored
135 .as_ref()
136 .map_err(|message| LiminalError::PublishFailed {
137 message: format!("channel actor unavailable: {message}"),
138 })?;
139 // Restart on a dead pid (R4) before returning the core for use.
140 supervisor.ensure_running(core, &self.restarts)?;
141 Ok(Arc::clone(core))
142 }
143}
144
145impl std::fmt::Debug for ChannelActorState {
146 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 formatter
148 .debug_struct("ChannelActorState")
149 .field("supervisor", &self.supervisor)
150 .finish_non_exhaustive()
151 }
152}
153
154/// Cloneable handle for interacting with a channel actor process.
155#[derive(Clone, Debug)]
156pub struct ChannelHandle {
157 config: ChannelConfig,
158 actor: Arc<ChannelActorState>,
159 durable: Option<Arc<Mutex<DurableChannel>>>,
160}
161
162impl ChannelHandle {
163 /// Creates an ephemeral handle backed by a real supervised channel actor on
164 /// the shared default supervisor.
165 ///
166 /// The actor process is spawned lazily on first use; a scheduler failure is
167 /// surfaced as a [`LiminalError`] from the first operation, not a panic.
168 #[must_use]
169 pub fn new(config: ChannelConfig) -> Self {
170 let supervisor = shared_supervisor().map_err(|error| error.to_string());
171 Self {
172 config,
173 actor: Arc::new(ChannelActorState::new(supervisor)),
174 durable: None,
175 }
176 }
177
178 /// Creates an ephemeral handle bound to an explicit `supervisor` (isolation
179 /// for the registry and tests).
180 #[must_use]
181 pub fn with_supervisor(config: ChannelConfig, supervisor: ChannelSupervisor) -> Self {
182 Self {
183 config,
184 actor: Arc::new(ChannelActorState::new(Ok(supervisor))),
185 durable: None,
186 }
187 }
188
189 /// Creates a durable handle that persists every accepted publish to `store`
190 /// before fanning it out to subscribers.
191 ///
192 /// # Errors
193 ///
194 /// Returns [`LiminalError::PublishFailed`] when the durable channel cannot be
195 /// initialized over `store`.
196 pub fn new_durable(
197 config: ChannelConfig,
198 store: Arc<dyn DurableStore>,
199 ) -> Result<Self, LiminalError> {
200 let durable = DurableChannel::new(config.name.clone(), RUNTIME_DURABLE_PARTITIONS, store)
201 .map_err(|error| LiminalError::PublishFailed {
202 message: format!(
203 "failed to initialize durable channel '{}': {error}",
204 config.name
205 ),
206 })?;
207 let supervisor = shared_supervisor()?;
208 Ok(Self {
209 config,
210 actor: Arc::new(ChannelActorState::new(Ok(supervisor))),
211 durable: Some(Arc::new(Mutex::new(durable))),
212 })
213 }
214
215 /// Creates a durable handle bound to an explicit `supervisor`.
216 ///
217 /// Used by the standalone server so every channel — durable or ephemeral —
218 /// shares ONE (optionally clustered) supervisor and thus one scheduler, which
219 /// is the precondition for cross-node delivery (SRV-005): a subscriber pid
220 /// joined to a channel's distributed process group must live on the same
221 /// scheduler that owns the distribution links.
222 ///
223 /// # Errors
224 ///
225 /// Returns [`LiminalError::PublishFailed`] when the durable channel cannot be
226 /// initialized over `store`.
227 pub fn new_durable_with_supervisor(
228 config: ChannelConfig,
229 store: Arc<dyn DurableStore>,
230 supervisor: ChannelSupervisor,
231 ) -> Result<Self, LiminalError> {
232 let durable = DurableChannel::new(config.name.clone(), RUNTIME_DURABLE_PARTITIONS, store)
233 .map_err(|error| LiminalError::PublishFailed {
234 message: format!(
235 "failed to initialize durable channel '{}': {error}",
236 config.name
237 ),
238 })?;
239 Ok(Self {
240 config,
241 actor: Arc::new(ChannelActorState::new(Ok(supervisor))),
242 durable: Some(Arc::new(Mutex::new(durable))),
243 })
244 }
245
246 /// Returns the channel configuration used to create this handle.
247 #[must_use]
248 pub const fn config(&self) -> &ChannelConfig {
249 &self.config
250 }
251
252 /// Publishes a payload to the channel with the default publisher identity.
253 ///
254 /// # Errors
255 ///
256 /// Returns a [`LiminalError`] when the channel cannot accept the payload or the schema rejects it.
257 pub fn publish<Payload>(&self, payload: Payload) -> Result<(), LiminalError>
258 where
259 Payload: AsRef<[u8]>,
260 {
261 self.publish_with_context(payload, PublisherId::default(), None)
262 }
263
264 /// Publishes a payload with an explicit publisher identity.
265 ///
266 /// # Errors
267 ///
268 /// Returns a [`LiminalError`] when the channel cannot accept the payload or the schema rejects it.
269 pub fn publish_from<Payload>(
270 &self,
271 publisher_id: impl Into<PublisherId>,
272 payload: Payload,
273 ) -> Result<(), LiminalError>
274 where
275 Payload: AsRef<[u8]>,
276 {
277 self.publish_with_context(payload, publisher_id.into(), None)
278 }
279
280 /// Publishes a payload with explicit publisher and causal metadata.
281 ///
282 /// # Errors
283 ///
284 /// Returns a [`LiminalError`] when the channel cannot accept the payload or the schema rejects it.
285 pub fn publish_with_context<Payload>(
286 &self,
287 payload: Payload,
288 publisher_id: PublisherId,
289 causal_context: Option<CausalContext>,
290 ) -> Result<(), LiminalError>
291 where
292 Payload: AsRef<[u8]>,
293 {
294 self.publish_with_delivery(payload, publisher_id, causal_context)
295 .map(|_delivery| ())
296 }
297
298 /// Publishes a payload and reports a genuine delivery ack.
299 ///
300 /// Returns a [`ChannelDelivery`] whose `delivered_count` is the number of
301 /// local subscribers the message was actually delivered to. A caller that
302 /// needs to know the message was ACCEPTED by a subscriber (not merely
303 /// buffered/published) inspects [`ChannelDelivery::is_delivered`]. This is the
304 /// channel-library half of the 13-L1 delivery-ack signal; the publish-without-
305 /// delivery methods stay unchanged for existing callers.
306 ///
307 /// # Errors
308 ///
309 /// Returns a [`LiminalError`] when the channel cannot accept the payload or the schema rejects it.
310 pub fn publish_with_delivery<Payload>(
311 &self,
312 payload: Payload,
313 publisher_id: PublisherId,
314 causal_context: Option<CausalContext>,
315 ) -> Result<ChannelDelivery, LiminalError>
316 where
317 Payload: AsRef<[u8]>,
318 {
319 // Durable channels persist the message to the store BEFORE acknowledging
320 // the publish (and before fanning out): a published message that was not
321 // durably recorded would be lost on shutdown, which CN7 forbids.
322 if let Some(durable) = self.durable.as_ref() {
323 self.persist_durable(durable, payload.as_ref(), &publisher_id)?;
324 }
325 let core = self.core()?;
326 let outcome = core.publish(payload.as_ref().to_vec(), publisher_id, causal_context)?;
327 // SRV-005: hand the normalised envelope to the cluster observer so it can
328 // fan the message out to remote subscribers. Local fan-out already
329 // happened inside `core.publish`; this is purely the cross-node leg and
330 // is a no-op when no observer is installed (non-clustered channels).
331 if let Some(observer) = self.actor.observer() {
332 observer.on_publish(&self.config.name, &outcome.envelope);
333 }
334 Ok(ChannelDelivery {
335 delivered_count: outcome.delivered_count,
336 })
337 }
338
339 fn persist_durable(
340 &self,
341 durable: &Arc<Mutex<DurableChannel>>,
342 payload: &[u8],
343 publisher_id: &PublisherId,
344 ) -> Result<(), LiminalError> {
345 let envelope = MessageEnvelope {
346 payload: payload.to_vec(),
347 causal_context: None,
348 timestamp: now_millis(),
349 publisher_id: publisher_id.as_str().to_owned(),
350 idempotency_key: None,
351 };
352 let publish_result = {
353 let mut channel = durable
354 .lock()
355 .map_err(|error| LiminalError::PublishFailed {
356 message: format!("durable channel state unavailable: {error}"),
357 })?;
358 block_on(channel.publish(&envelope))
359 };
360 publish_result
361 .map_err(|error| LiminalError::PublishFailed {
362 message: format!(
363 "durable publish bridge for channel '{}' failed: {error}",
364 self.config.name
365 ),
366 })?
367 .map_err(|error| LiminalError::PublishFailed {
368 message: format!(
369 "durable publish to channel '{}' failed: {error}",
370 self.config.name
371 ),
372 })?;
373 Ok(())
374 }
375
376 /// Returns the schema version currently owned by the channel actor.
377 ///
378 /// # Errors
379 ///
380 /// Returns a [`LiminalError`] when the channel actor cannot be read.
381 pub fn current_schema_id(&self) -> Result<SchemaId, LiminalError> {
382 self.core()?.schema_id()
383 }
384
385 /// Evolves the channel schema by adding a defaulted field without disconnecting subscribers.
386 ///
387 /// # Errors
388 ///
389 /// Returns [`SchemaValidationError`] when the schema cannot be evolved.
390 pub fn evolve_schema_add_field(
391 &self,
392 name: impl Into<String>,
393 field_schema: Value,
394 default: Value,
395 ) -> Result<SchemaId, SchemaValidationError> {
396 let core = self
397 .core()
398 .map_err(|error| SchemaValidationError::InvalidSchema {
399 message: error.to_string(),
400 })?;
401 core.evolve(name.into(), field_schema, default)
402 }
403
404 /// Subscribes to the channel, receiving every published message.
405 ///
406 /// # Errors
407 ///
408 /// Returns a [`LiminalError`] when a subscription cannot be created.
409 pub fn subscribe(&self) -> Result<SubscriptionHandle, LiminalError> {
410 self.subscribe_inner(None)
411 }
412
413 /// Subscribes with a delivery predicate: only messages for which `predicate`
414 /// returns `true` are delivered to this subscriber. The predicate is owned
415 /// and evaluated by the actor process (R3).
416 ///
417 /// # Clustering
418 ///
419 /// The predicate filters **local-node publishes only**. Under clustering
420 /// (SRV-005), messages published on a remote node are delivered to this
421 /// subscriber *ungated* — the predicate is a non-serializable closure and is
422 /// not propagated across the wire, so remote nodes cannot evaluate it. If you
423 /// need filtering to hold for cross-node traffic, filter again on receipt
424 /// rather than relying on this predicate alone.
425 ///
426 /// # Errors
427 ///
428 /// Returns a [`LiminalError`] when a subscription cannot be created.
429 pub fn subscribe_filtered<F>(&self, predicate: F) -> Result<SubscriptionHandle, LiminalError>
430 where
431 F: Fn(&Envelope) -> bool + Send + Sync + 'static,
432 {
433 self.subscribe_inner(Some(predicate_from(predicate)))
434 }
435
436 fn subscribe_inner(
437 &self,
438 predicate: Option<SubscriptionPredicate>,
439 ) -> Result<SubscriptionHandle, LiminalError> {
440 let core = self.core()?;
441 let (handle, registration) = SubscriptionHandle::spawn(core.scheduler(), predicate)?;
442 let pid = registration.pid();
443 core.subscribe(registration)?;
444 // SRV-005: tell the cluster a local subscriber joined this channel so it
445 // can advertise the subscription to peers via its process group.
446 if let Some(observer) = self.actor.observer() {
447 observer.on_subscribe(&self.config.name, pid);
448 }
449 Ok(handle)
450 }
451
452 /// Unsubscribes the subscriber owning `subscription` by its process pid.
453 ///
454 /// # Errors
455 ///
456 /// Returns a [`LiminalError`] when the unsubscribe command fails.
457 pub fn unsubscribe(&self, subscription: &SubscriptionHandle) -> Result<(), LiminalError> {
458 let pid = subscription.pid();
459 self.core()?.unsubscribe(pid)?;
460 // SRV-005: tell the cluster the local subscriber left so it can withdraw
461 // the subscription from its process group.
462 if let Some(observer) = self.actor.observer() {
463 observer.on_unsubscribe(&self.config.name, pid);
464 }
465 Ok(())
466 }
467
468 /// Flushes buffered durable channel state to the backing store before shutdown.
469 ///
470 /// # Errors
471 ///
472 /// Returns a [`LiminalError`] when the channel actor cannot be inspected or
473 /// when the durable store flush fails.
474 pub fn flush(&self) -> Result<(), LiminalError> {
475 // Confirm the actor is reachable (and restart it if needed) before flush.
476 drop(self.core()?);
477 let Some(durable) = self.durable.as_ref() else {
478 return Ok(());
479 };
480 let flush_result = {
481 let channel = durable
482 .lock()
483 .map_err(|error| LiminalError::PublishFailed {
484 message: format!("durable channel state unavailable: {error}"),
485 })?;
486 block_on(channel.flush_store())
487 };
488 flush_result
489 .map_err(|error| LiminalError::PublishFailed {
490 message: format!(
491 "durable flush bridge for channel '{}' failed: {error}",
492 self.config.name
493 ),
494 })?
495 .map_err(|error| LiminalError::PublishFailed {
496 message: format!(
497 "durable flush for channel '{}' failed: {error}",
498 self.config.name
499 ),
500 })?;
501 Ok(())
502 }
503
504 /// Returns the number of currently-active subscribers on the channel actor.
505 ///
506 /// # Errors
507 ///
508 /// Returns a [`LiminalError`] when the actor cannot service the query.
509 pub fn subscriber_count(&self) -> Result<usize, LiminalError> {
510 Ok(self.core()?.list_subscribers()?.len())
511 }
512
513 /// Closes the channel gracefully, stopping the actor process.
514 ///
515 /// # Errors
516 ///
517 /// Returns a [`LiminalError`] when the channel cannot be shut down.
518 pub fn close(&self) -> Result<(), LiminalError> {
519 self.core()?.close()
520 }
521
522 fn core(&self) -> Result<Arc<ChannelActorCore>, LiminalError> {
523 self.actor.core(&self.config.schema)
524 }
525
526 /// The channel actor's current beamr pid, ensuring it is running first.
527 /// Test-only: lets restart tests crash the exact actor process.
528 #[cfg(test)]
529 pub(crate) fn actor_pid(&self) -> Result<u64, LiminalError> {
530 let core = self.core()?;
531 core.current_pid()?
532 .ok_or_else(|| LiminalError::DeliveryFailed {
533 message: "channel actor has no live pid".to_owned(),
534 })
535 }
536
537 /// The scheduler the channel actor and its subscribers run on (test-only).
538 #[cfg(test)]
539 pub(crate) fn scheduler(&self) -> Result<Arc<beamr::scheduler::Scheduler>, LiminalError> {
540 Ok(Arc::clone(self.core()?.scheduler()))
541 }
542}
543
544/// Returns the current epoch milliseconds, saturating to zero before the epoch.
545fn now_millis() -> u64 {
546 SystemTime::now()
547 .duration_since(UNIX_EPOCH)
548 .map_or(0, |duration| {
549 u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
550 })
551}