liminal_server/server/connection/services.rs
1use std::collections::HashMap;
2use std::hash::{Hash, Hasher};
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{Arc, Mutex, mpsc};
6use std::time::Instant;
7
8use haematite::{Database, DatabaseConfig, EventStore};
9use liminal::channel::{ChannelConfig, ChannelHandle, ChannelMode, Schema};
10use liminal::conversation::{
11 ConversationSupervisor, CrashPolicy, EchoBehaviour, ParticipantBehaviour,
12};
13use liminal::durability::bridge::block_on;
14use liminal::durability::{
15 DedupCache, DedupDecision, DurableStore, HaematiteStore, ProcessingReceipt,
16};
17use liminal::protocol::{MessageEnvelope, ProtocolError, SchemaId as ProtocolSchemaId};
18
19use super::conversation::{ConnectionConversation, LiminalConversationResource};
20use super::services_cluster::build_channel_cluster;
21use crate::ServerError;
22use crate::config::types::ServerConfig;
23
24pub use super::services_cluster::ChannelCluster;
25
26/// Registry of custom conversation responders, keyed by conversation subject.
27///
28/// A registered [`ParticipantBehaviour`] becomes the participant for any
29/// conversation opened on its subject; subjects with no entry fall back to the
30/// built-in [`EchoBehaviour`].
31type ResponderRegistry = HashMap<String, Arc<dyn ParticipantBehaviour>>;
32
33/// Marker for resources retained by a connection process until unsubscribe.
34pub trait SubscriptionResource: std::fmt::Debug + Send {
35 /// Releases the library subscription resource.
36 ///
37 /// # Errors
38 /// Returns [`ServerError`] when the liminal library reports an unsubscribe failure.
39 fn unsubscribe(self: Box<Self>) -> Result<(), ServerError>;
40}
41
42/// Library subscription resource owned by a single connection process.
43#[derive(Debug)]
44pub struct ConnectionSubscription {
45 id: u64,
46 selected_schema: ProtocolSchemaId,
47 resource: Box<dyn SubscriptionResource>,
48}
49
50impl ConnectionSubscription {
51 /// Creates an owned subscription resource for one connection process.
52 #[must_use]
53 pub fn new(
54 id: u64,
55 selected_schema: ProtocolSchemaId,
56 resource: Box<dyn SubscriptionResource>,
57 ) -> Self {
58 Self {
59 id,
60 selected_schema,
61 resource,
62 }
63 }
64
65 /// Returns the protocol subscription id.
66 #[must_use]
67 pub const fn id(&self) -> u64 {
68 self.id
69 }
70
71 /// Returns the schema selected for this subscription stream.
72 #[must_use]
73 pub const fn selected_schema(&self) -> ProtocolSchemaId {
74 self.selected_schema
75 }
76
77 pub(super) fn unsubscribe(self) -> Result<(), ServerError> {
78 self.resource.unsubscribe()
79 }
80}
81
82/// Outcome of a server publish.
83///
84/// Carries the assigned message id plus a genuine delivery ack (`delivered` = the
85/// message was accepted by at least one live subscriber on this publish, after any
86/// dedup-on-delivery suppression).
87#[derive(Clone, Copy, Debug, PartialEq, Eq)]
88pub struct PublishOutcome {
89 /// Monotonic message id assigned to the accepted publish.
90 pub message_id: u64,
91 /// Whether the message was genuinely delivered to a subscriber. `false` means
92 /// the publish was accepted but reached no subscriber (empty channel) or was
93 /// a duplicate suppressed by dedup-on-delivery.
94 pub delivered: bool,
95}
96
97/// Operations that adapt wire frames to liminal library calls.
98pub trait ConnectionServices: std::fmt::Debug + Send + Sync {
99 /// Delegates a publish request to the liminal library.
100 ///
101 /// `idempotency_key`, when `Some`, drives dedup-on-delivery: a re-publish with
102 /// the same key is delivered to subscribers at most once. The returned
103 /// [`PublishOutcome`] carries the genuine delivery ack.
104 ///
105 /// # Errors
106 /// Returns [`ServerError`] when the liminal publish operation fails.
107 fn publish(
108 &self,
109 channel: &str,
110 envelope: &MessageEnvelope,
111 idempotency_key: Option<&str>,
112 ) -> Result<PublishOutcome, ServerError>;
113
114 /// Delegates a subscribe request to the liminal library.
115 ///
116 /// # Errors
117 /// Returns [`ServerError`] when the liminal subscribe operation fails.
118 fn subscribe(
119 &self,
120 channel: &str,
121 accepted_schemas: &[ProtocolSchemaId],
122 ) -> Result<ConnectionSubscription, ServerError>;
123
124 /// Delegates unsubscribe to the liminal library.
125 ///
126 /// # Errors
127 /// Returns [`ServerError`] when the liminal unsubscribe operation fails.
128 fn unsubscribe(&self, subscription: ConnectionSubscription) -> Result<(), ServerError>;
129
130 /// Delegates conversation open to the liminal library.
131 ///
132 /// # Errors
133 /// Returns [`ServerError`] when the liminal conversation open operation fails.
134 fn open_conversation(
135 &self,
136 conversation_id: u64,
137 subject: &str,
138 ) -> Result<ConnectionConversation, ServerError>;
139
140 /// Delegates a conversation message to the liminal library.
141 ///
142 /// # Errors
143 /// Returns [`ServerError`] when the liminal conversation message operation fails.
144 fn conversation_message(
145 &self,
146 conversation: &ConnectionConversation,
147 envelope: &MessageEnvelope,
148 ) -> Result<(), ServerError>;
149
150 /// Delegates conversation close to the liminal library.
151 ///
152 /// # Errors
153 /// Returns [`ServerError`] when the liminal conversation close operation fails.
154 fn close_conversation(&self, conversation: ConnectionConversation) -> Result<(), ServerError>;
155
156 /// Flushes durable channel state through the liminal library boundary.
157 ///
158 /// # Errors
159 /// Returns [`ServerError`] when the liminal channel flush operation fails.
160 fn flush_durable_state(&self) -> Result<(), ServerError>;
161}
162
163/// Default adapter from server wire frames to liminal channel/conversation APIs.
164#[derive(Debug)]
165pub struct LiminalConnectionServices {
166 channels: HashMap<String, ConfiguredChannel>,
167 cluster: ChannelCluster,
168 durable_store: Arc<dyn DurableStore>,
169 /// In-memory (haematite-backed) dedup cache for dedup-on-delivery. Keyed by
170 /// the per-message idempotency key carried on the publish frame; a duplicate
171 /// key is suppressed before fan-out so a subscriber receives it at most once.
172 /// Not persisted across restarts (13-L1 scope; durable dedup is deferred).
173 dedup: DedupCache,
174 conversation_supervisor: Arc<ConversationSupervisor>,
175 /// Registered custom conversation responders, keyed by conversation subject.
176 ///
177 /// When a conversation is opened (`open_conversation`), the subject is looked
178 /// up here: a registered [`ParticipantBehaviour`] becomes the conversation's
179 /// participant; with no registration the conversation falls back to the
180 /// built-in [`EchoBehaviour`], preserving the original echo semantics exactly.
181 /// This is the seam aion #13 plugs a remote worker responder into. Interior
182 /// mutability is required because the services are shared behind `&self`.
183 responders: Mutex<ResponderRegistry>,
184 next_message_id: AtomicU64,
185 next_subscription_id: AtomicU64,
186}
187
188impl LiminalConnectionServices {
189 /// Builds library-backed services from validated server configuration.
190 ///
191 /// Durable-mode channels are backed by a shared haematite event store so
192 /// their publishes are persisted and survive the graceful-shutdown flush;
193 /// ephemeral channels carry no store.
194 ///
195 /// # Errors
196 /// Returns [`ServerError`] when a configured channel cannot be initialized.
197 pub fn from_config(config: &ServerConfig) -> Result<Self, ServerError> {
198 let store = build_durable_store(config.persistence_path.as_deref())?;
199 Self::from_config_with_store(config, store)
200 }
201
202 /// Builds services over a caller-provided durable store.
203 ///
204 /// Used by tests that need to inspect persisted state through the same store
205 /// handle the durable channels write to.
206 ///
207 /// # Errors
208 /// Returns [`ServerError`] when a configured channel cannot be initialized.
209 pub fn from_config_with_store(
210 config: &ServerConfig,
211 durable_store: Arc<dyn DurableStore>,
212 ) -> Result<Self, ServerError> {
213 // Build ONE shared channel supervisor for the whole server. When a
214 // [cluster] section is present it is distribution-enabled, so every
215 // channel actor and subscriber shares the clustered scheduler the cluster
216 // attaches its process-group transport to (SRV-005, Constraint B).
217 let cluster = build_channel_cluster(config.cluster.as_ref())?;
218 let mut channels = HashMap::new();
219 for channel in &config.channels {
220 let schema = Schema::new(serde_json::json!({})).map_err(|error| {
221 ServerError::ConfigValidation {
222 message: format!("failed to initialize channel '{}': {error}", channel.name),
223 }
224 })?;
225 let channel_config = if channel.durable {
226 ChannelConfig::new(channel.name.clone(), schema, ChannelMode::Durable)
227 } else {
228 ChannelConfig::new(channel.name.clone(), schema, ChannelMode::Ephemeral)
229 };
230 let handle = if channel.durable {
231 ChannelHandle::new_durable_with_supervisor(
232 channel_config,
233 Arc::clone(&durable_store),
234 cluster.supervisor().clone(),
235 )
236 .map_err(|error| ServerError::ConfigValidation {
237 message: format!(
238 "failed to initialize durable channel '{}': {error}",
239 channel.name
240 ),
241 })?
242 } else {
243 ChannelHandle::with_supervisor(channel_config, cluster.supervisor().clone())
244 };
245 channels.insert(
246 channel.name.clone(),
247 ConfiguredChannel {
248 handle,
249 protocol_schema: schema_ref_id(&channel.schema_ref),
250 },
251 );
252 }
253 let conversation_supervisor = Arc::new(ConversationSupervisor::new().map_err(|error| {
254 ServerError::ConfigValidation {
255 message: format!("failed to start conversation supervisor: {error}"),
256 }
257 })?);
258 let dedup = DedupCache::new(Arc::clone(&durable_store), DELIVERY_DEDUP_NAMESPACE);
259 Ok(Self {
260 channels,
261 cluster,
262 durable_store,
263 dedup,
264 conversation_supervisor,
265 responders: Mutex::new(HashMap::new()),
266 next_message_id: AtomicU64::new(1),
267 next_subscription_id: AtomicU64::new(1),
268 })
269 }
270
271 /// Builds services with no configured channels.
272 ///
273 /// # Errors
274 /// Returns [`ServerError`] when the conversation supervisor scheduler cannot start.
275 pub fn empty() -> Result<Self, ServerError> {
276 let conversation_supervisor = Arc::new(ConversationSupervisor::new().map_err(|error| {
277 ServerError::ConfigValidation {
278 message: format!("failed to start conversation supervisor: {error}"),
279 }
280 })?);
281 let durable_store = build_durable_store(None)?;
282 let dedup = DedupCache::new(Arc::clone(&durable_store), DELIVERY_DEDUP_NAMESPACE);
283 Ok(Self {
284 channels: HashMap::new(),
285 cluster: build_channel_cluster(None)?,
286 durable_store,
287 dedup,
288 conversation_supervisor,
289 responders: Mutex::new(HashMap::new()),
290 next_message_id: AtomicU64::new(1),
291 next_subscription_id: AtomicU64::new(1),
292 })
293 }
294
295 /// The shared channel supervisor + cluster resolver backing this service.
296 ///
297 /// The server runtime uses this to attach the cluster to the channel
298 /// supervisor's clustered scheduler (SRV-005).
299 #[must_use]
300 pub const fn channel_cluster(&self) -> &ChannelCluster {
301 &self.cluster
302 }
303
304 /// Returns the shared durable store backing this service's durable channels.
305 #[must_use]
306 pub fn durable_store(&self) -> Arc<dyn DurableStore> {
307 Arc::clone(&self.durable_store)
308 }
309
310 /// Returns the conversation supervisor backing supervised conversations.
311 ///
312 /// Tests use this to reach the underlying beamr scheduler so they can spawn
313 /// or terminate participant processes and exercise crash detection.
314 #[must_use]
315 pub fn conversation_supervisor(&self) -> Arc<ConversationSupervisor> {
316 Arc::clone(&self.conversation_supervisor)
317 }
318
319 /// Registers a custom conversation responder for a routing `subject`.
320 ///
321 /// When a conversation is later opened with this exact `subject`, its
322 /// participant runs `behaviour` instead of the built-in [`EchoBehaviour`].
323 /// The responder is spawned and supervised identically to the echo
324 /// participant — a real linked beamr process with the same crash-detection
325 /// semantics — so this exposes the responder seam without changing how
326 /// participants run. Registering a subject that already has a responder
327 /// replaces it; the previous behaviour is returned.
328 ///
329 /// This is the liminal-side seam aion #13 plugs a remote worker into: it
330 /// registers a responder that forwards each request to the worker and routes
331 /// the worker's reply back through the conversation. Subjects with no
332 /// registration keep echoing, so existing callers are unaffected.
333 ///
334 /// # Errors
335 /// Returns [`ServerError`] when the responder registry lock is poisoned.
336 pub fn register_responder(
337 &self,
338 subject: impl Into<String>,
339 behaviour: Arc<dyn ParticipantBehaviour>,
340 ) -> Result<Option<Arc<dyn ParticipantBehaviour>>, ServerError> {
341 let mut responders = self.lock_responders()?;
342 Ok(responders.insert(subject.into(), behaviour))
343 }
344
345 /// Removes the custom responder registered for `subject`, if any.
346 ///
347 /// After removal the subject reverts to the built-in [`EchoBehaviour`] on the
348 /// next [`Self::open_conversation`]. Returns the removed behaviour when one
349 /// was registered.
350 ///
351 /// # Errors
352 /// Returns [`ServerError`] when the responder registry lock is poisoned.
353 pub fn unregister_responder(
354 &self,
355 subject: &str,
356 ) -> Result<Option<Arc<dyn ParticipantBehaviour>>, ServerError> {
357 let mut responders = self.lock_responders()?;
358 Ok(responders.remove(subject))
359 }
360
361 /// Resolves the responder behaviour for `subject`: the registered custom
362 /// responder when present, otherwise the built-in [`EchoBehaviour`].
363 ///
364 /// This is the single routing decision behind the seam — registered-or-echo —
365 /// so the fallback is identical to the original hard-wired echo path.
366 fn responder_for(&self, subject: &str) -> Result<Arc<dyn ParticipantBehaviour>, ServerError> {
367 let responders = self.lock_responders()?;
368 Ok(responders.get(subject).map_or_else(
369 || Arc::new(EchoBehaviour) as Arc<dyn ParticipantBehaviour>,
370 Arc::clone,
371 ))
372 }
373
374 /// Locks the responder registry, mapping a poisoned lock to a [`ServerError`]
375 /// rather than panicking (the workspace denies `unwrap`/`expect`/`panic`).
376 fn lock_responders(&self) -> Result<std::sync::MutexGuard<'_, ResponderRegistry>, ServerError> {
377 self.responders
378 .lock()
379 .map_err(|_poisoned| ServerError::ListenerAccept {
380 message: "responder registry lock poisoned".to_owned(),
381 })
382 }
383
384 /// Subscribes to a configured channel and returns the raw library
385 /// subscription handle so a test can drain the subscriber inbox directly and
386 /// observe exactly which messages reached a subscriber.
387 #[cfg(test)]
388 pub(crate) fn subscribe_handle_for_test(
389 &self,
390 channel: &str,
391 ) -> Result<liminal::channel::SubscriptionHandle, ServerError> {
392 let configured = self
393 .channels
394 .get(channel)
395 .ok_or_else(|| ServerError::ListenerAccept {
396 message: format!("channel '{channel}' is not configured"),
397 })?;
398 configured
399 .handle
400 .subscribe()
401 .map_err(|error| ServerError::ListenerAccept {
402 message: format!("liminal subscribe failed for channel '{channel}': {error}"),
403 })
404 }
405
406 /// Claims the delivery right for an idempotency key.
407 ///
408 /// Returns `Ok(true)` when this is the first publish for the key (the caller
409 /// may deliver), and `Ok(false)` when the key was already claimed/completed (a
410 /// duplicate the caller must suppress). The dedup cache is driven synchronously
411 /// over the in-memory haematite store via the durable bridge.
412 fn claim_delivery(&self, key: &str) -> Result<bool, ServerError> {
413 let decision = block_on(self.dedup.claim_or_get(key, dedup_timestamp_millis()))
414 .map_err(|error| ServerError::ListenerAccept {
415 message: format!("dedup bridge failed for key '{key}': {error}"),
416 })?
417 .map_err(|error| ServerError::ListenerAccept {
418 message: format!("dedup claim failed for key '{key}': {error}"),
419 })?;
420 Ok(matches!(decision, DedupDecision::Claimed))
421 }
422
423 /// Releases a dangling in-flight dedup claim after a failed delivery.
424 ///
425 /// Best-effort: a release failure cannot mask the original publish error, so
426 /// this returns nothing and logs at `error` level instead of surfacing. It is
427 /// never silent — the leak (a permanently suppressed key) must be observable.
428 /// `release_claim` itself never clobbers a stored receipt, so calling it on the
429 /// failure path is safe even if a concurrent completion raced ahead.
430 fn release_claim(&self, key: &str) {
431 match block_on(self.dedup.release_claim(key)) {
432 Ok(Ok(())) => {}
433 Ok(Err(error)) => {
434 tracing::error!(
435 idempotency_key = key,
436 %error,
437 "failed to release dedup claim after publish failure; key may stay suppressed"
438 );
439 }
440 Err(error) => {
441 tracing::error!(
442 idempotency_key = key,
443 %error,
444 "dedup release bridge failed after publish failure; key may stay suppressed"
445 );
446 }
447 }
448 }
449}
450
451/// Returns the current epoch-millis timestamp used as the dedup entry anchor.
452///
453/// A clock error before the Unix epoch yields `0`: the timestamp is only a TTL
454/// anchor for the in-memory cache and a zero anchor never breaks the at-most-once
455/// claim semantics, so this avoids surfacing a clock fault on the publish path.
456fn dedup_timestamp_millis() -> u64 {
457 use std::time::{SystemTime, UNIX_EPOCH};
458 SystemTime::now()
459 .duration_since(UNIX_EPOCH)
460 .ok()
461 .and_then(|duration| u64::try_from(duration.as_millis()).ok())
462 .unwrap_or(0)
463}
464
465/// Default shard count for an on-disk durable store.
466///
467/// Haematite routes keys across this many single-threaded shard actors; a small
468/// power of two gives parallelism across cursors/streams without spawning an
469/// actor per core. The value is fixed (haematite has no silent default) and not
470/// yet surfaced in server config.
471const DEFAULT_SHARD_COUNT: usize = 8;
472
473/// Namespace prefix for the dedup-on-delivery cache streams. Keeps delivery dedup
474/// keys from colliding with any other haematite streams in the shared store.
475const DELIVERY_DEDUP_NAMESPACE: &str = "liminal:delivery-dedup";
476
477/// Builds the on-disk haematite-backed durable store.
478///
479/// When `persistence_path` is `Some`, the database lives there and survives
480/// process restarts: an existing database directory is reopened, a fresh one is
481/// created. When it is `None` (no durable path configured, or the channel-free
482/// `empty()` services used by tests), an ephemeral per-instance directory under
483/// the system temp dir is created instead — it still persists to disk for the
484/// lifetime of the process, but is not a stable restart location.
485fn build_durable_store(
486 persistence_path: Option<&Path>,
487) -> Result<Arc<dyn DurableStore>, ServerError> {
488 let data_dir = persistence_path.map_or_else(ephemeral_data_dir, |path| path.join("durability"));
489 let database = open_or_create_database(&data_dir)?;
490 let event_store = EventStore::new(database);
491 Ok(Arc::new(HaematiteStore::new(Arc::new(event_store))))
492}
493
494/// Opens an existing haematite database at `data_dir`, or creates one.
495fn open_or_create_database(data_dir: &Path) -> Result<Database, ServerError> {
496 let config_file = data_dir.join("config.json");
497 let result = if config_file.exists() {
498 Database::open(data_dir)
499 } else {
500 Database::create(DatabaseConfig {
501 data_dir: data_dir.to_path_buf(),
502 shard_count: DEFAULT_SHARD_COUNT,
503 sweep_interval: None,
504 distributed: None,
505 })
506 };
507 result.map_err(|error| ServerError::ConfigValidation {
508 message: format!(
509 "failed to open durable store at {}: {error}",
510 data_dir.display()
511 ),
512 })
513}
514
515/// Produces a unique on-disk directory under the system temp dir.
516///
517/// The pid plus a monotonic counter keep concurrent servers and parallel tests
518/// from sharing a database directory.
519fn ephemeral_data_dir() -> PathBuf {
520 static COUNTER: AtomicU64 = AtomicU64::new(0);
521 let unique = COUNTER.fetch_add(1, Ordering::Relaxed);
522 std::env::temp_dir().join(format!(
523 "liminal-durability-{}-{unique}",
524 std::process::id()
525 ))
526}
527
528impl ConnectionServices for LiminalConnectionServices {
529 fn publish(
530 &self,
531 channel: &str,
532 envelope: &MessageEnvelope,
533 idempotency_key: Option<&str>,
534 ) -> Result<PublishOutcome, ServerError> {
535 let handle = self
536 .channels
537 .get(channel)
538 .map(|configured| configured.handle.clone())
539 .ok_or_else(|| ServerError::ListenerAccept {
540 message: format!("channel '{channel}' is not configured"),
541 })?;
542
543 // Dedup-on-delivery: a publish carrying an idempotency key is delivered to
544 // subscribers AT MOST ONCE across re-publishes of the same key. Only a
545 // fresh `Claimed` decision proceeds to fan-out; a `Completed`/`InFlight`
546 // decision is a duplicate and is suppressed (no second delivery), which is
547 // the at-most-once guarantee the aion outbox relies on.
548 if let Some(key) = idempotency_key {
549 if !self.claim_delivery(key)? {
550 return Ok(PublishOutcome {
551 message_id: self.next_message_id.fetch_add(1, Ordering::Relaxed),
552 delivered: false,
553 });
554 }
555 }
556
557 let delivery = handle.publish_with_delivery(
558 &envelope.payload,
559 liminal::envelope::PublisherId::default(),
560 None,
561 );
562 let delivery = match delivery {
563 Ok(delivery) => delivery,
564 Err(error) => {
565 // The claim above appended an `InFlight` entry but the delivery
566 // failed before `complete_receipt` could run. Release the claim so
567 // the key is re-claimable; otherwise every re-publish would see
568 // `InFlight` and be suppressed forever. Best-effort: surface the
569 // ORIGINAL publish error regardless, but never swallow a release
570 // failure silently (it leaves the leak intact).
571 if let Some(key) = idempotency_key {
572 self.release_claim(key);
573 }
574 return Err(ServerError::ListenerAccept {
575 message: format!("liminal publish failed for channel '{channel}': {error}"),
576 });
577 }
578 };
579
580 // Record the dedup completion AFTER a successful claimed delivery so the
581 // claim is not left dangling `InFlight` (which would wrongly defer every
582 // future duplicate). The receipt body is empty: the dedup contract here
583 // only needs presence, not a stored result.
584 if let Some(key) = idempotency_key {
585 block_on(
586 self.dedup
587 .complete_receipt(key, ProcessingReceipt::new(Vec::new())),
588 )
589 .map_err(|error| ServerError::ListenerAccept {
590 message: format!("dedup receipt bridge failed for key '{key}': {error}"),
591 })?
592 .map_err(|error| ServerError::ListenerAccept {
593 message: format!("dedup receipt write failed for key '{key}': {error}"),
594 })?;
595 }
596
597 Ok(PublishOutcome {
598 message_id: self.next_message_id.fetch_add(1, Ordering::Relaxed),
599 delivered: delivery.is_delivered(),
600 })
601 }
602
603 fn subscribe(
604 &self,
605 channel: &str,
606 accepted_schemas: &[ProtocolSchemaId],
607 ) -> Result<ConnectionSubscription, ServerError> {
608 let configured = self
609 .channels
610 .get(channel)
611 .ok_or_else(|| ServerError::ListenerAccept {
612 message: format!("channel '{channel}' is not configured"),
613 })?;
614 let selected_schema = if accepted_schemas.is_empty() {
615 configured.protocol_schema
616 } else {
617 liminal::protocol::negotiate_schema(configured.protocol_schema, accepted_schemas)
618 .map_err(|error| server_error_from_protocol(&error))?
619 };
620 let subscription =
621 configured
622 .handle
623 .subscribe()
624 .map_err(|error| ServerError::ListenerAccept {
625 message: format!("liminal subscribe failed for channel '{channel}': {error}"),
626 })?;
627 let id = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
628 Ok(ConnectionSubscription::new(
629 id,
630 selected_schema,
631 Box::new(LiminalSubscriptionResource { subscription }),
632 ))
633 }
634
635 fn unsubscribe(&self, subscription: ConnectionSubscription) -> Result<(), ServerError> {
636 subscription.unsubscribe()
637 }
638
639 fn open_conversation(
640 &self,
641 conversation_id: u64,
642 subject: &str,
643 ) -> Result<ConnectionConversation, ServerError> {
644 // Spawn a REAL participant process (a beamr `NativeHandler` running the
645 // resolved responder behaviour) on the conversation supervisor's
646 // scheduler, and a supervised conversation actor linked to it. The actor
647 // FORWARDS each conversation message to the participant, which genuinely
648 // processes it and delivers a reply back. The actor traps the
649 // participant's EXIT (a beamr process link), so killing it fires a
650 // structural, microsecond-scale crash signal.
651 //
652 // The responder is chosen by `subject`: a custom responder registered via
653 // `register_responder` for this subject, or the built-in `EchoBehaviour`
654 // when none is registered. Either way it runs as the SAME supervised,
655 // linked participant process — the seam changes WHO responds, not HOW the
656 // participant is spawned or supervised.
657 let behaviour = self.responder_for(subject)?;
658 let (actor, participant) = self
659 .conversation_supervisor
660 .spawn_with_participant(behaviour, None, ChannelMode::Ephemeral, CrashPolicy::Fail)
661 .map_err(|error| ServerError::ListenerAccept {
662 message: format!(
663 "failed to spawn supervised conversation {conversation_id} ('{subject}'): {error}"
664 ),
665 })?;
666
667 // Drive boot to completion so the beamr link to the participant exists
668 // before any message is forwarded (link-before-forward), mirroring the
669 // ROUTING-004 dispatch pattern.
670 actor.pid().map_err(|error| ServerError::ListenerAccept {
671 message: format!(
672 "failed to boot supervised conversation {conversation_id} ('{subject}'): {error}"
673 ),
674 })?;
675
676 // Register the structural EXIT notifier BEFORE returning, so a crash that
677 // fires the instant a message reaches the participant is never missed.
678 // The notifier is woken by the actor's trapped-EXIT handler (event
679 // driven), and a crash that already landed is replayed immediately.
680 let (exit_tx, exit_rx) = mpsc::sync_channel::<Instant>(1);
681 actor
682 .notify_on_participant_exit(participant, exit_tx)
683 .map_err(|error| ServerError::ListenerAccept {
684 message: format!(
685 "failed to arm crash detection for conversation {conversation_id}: {error}"
686 ),
687 })?;
688
689 Ok(ConnectionConversation::new(Box::new(
690 LiminalConversationResource::new(actor, participant, exit_rx),
691 )))
692 }
693
694 fn conversation_message(
695 &self,
696 conversation: &ConnectionConversation,
697 envelope: &MessageEnvelope,
698 ) -> Result<(), ServerError> {
699 conversation.message(envelope)
700 }
701
702 fn close_conversation(&self, conversation: ConnectionConversation) -> Result<(), ServerError> {
703 conversation.close()
704 }
705
706 fn flush_durable_state(&self) -> Result<(), ServerError> {
707 for (channel_name, configured) in &self.channels {
708 if configured.handle.config().mode == ChannelMode::Durable {
709 configured
710 .handle
711 .flush()
712 .map_err(|error| ServerError::ShutdownFlush {
713 message: format!(
714 "failed to flush durable channel '{channel_name}': {error}"
715 ),
716 })?;
717 }
718 }
719 Ok(())
720 }
721}
722
723#[derive(Debug)]
724struct ConfiguredChannel {
725 handle: ChannelHandle,
726 protocol_schema: ProtocolSchemaId,
727}
728
729#[derive(Debug)]
730struct LiminalSubscriptionResource {
731 subscription: liminal::channel::SubscriptionHandle,
732}
733
734impl SubscriptionResource for LiminalSubscriptionResource {
735 fn unsubscribe(self: Box<Self>) -> Result<(), ServerError> {
736 drop(self.subscription);
737 Ok(())
738 }
739}
740
741pub(super) fn server_error_from_protocol(error: &ProtocolError) -> ServerError {
742 ServerError::ListenerAccept {
743 message: format!("protocol operation failed: {error}"),
744 }
745}
746
747fn schema_ref_id(schema_ref: &str) -> ProtocolSchemaId {
748 let mut bytes = [0_u8; ProtocolSchemaId::WIRE_LEN];
749 let mut hash = std::collections::hash_map::DefaultHasher::new();
750 schema_ref.hash(&mut hash);
751 let seed = hash.finish().to_be_bytes();
752 for (index, byte) in bytes.iter_mut().enumerate() {
753 *byte = seed[index % seed.len()];
754 }
755 ProtocolSchemaId::new(bytes)
756}