Skip to main content

liminal_server/server/connection/
services.rs

1use std::collections::HashMap;
2use std::hash::{Hash, Hasher};
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{Arc, Mutex, mpsc};
6use std::time::Instant;
7
8use haematite::{Database, DatabaseConfig, EventStore};
9use liminal::channel::{ChannelConfig, ChannelHandle, ChannelMode, Schema};
10use liminal::conversation::{
11    ConversationSupervisor, CrashPolicy, EchoBehaviour, ParticipantBehaviour,
12};
13use liminal::durability::bridge::block_on;
14use liminal::durability::{
15    DedupCache, DedupDecision, DurableStore, HaematiteStore, ProcessingReceipt,
16};
17use liminal::protocol::{MessageEnvelope, ProtocolError, SchemaId as ProtocolSchemaId};
18
19use super::conversation::{ConnectionConversation, LiminalConversationResource};
20use super::services_cluster::build_channel_cluster;
21use crate::ServerError;
22use crate::config::types::ServerConfig;
23
24pub use super::services_cluster::ChannelCluster;
25
26/// Registry of custom conversation responders, keyed by conversation subject.
27///
28/// A registered [`ParticipantBehaviour`] becomes the participant for any
29/// conversation opened on its subject; subjects with no entry fall back to the
30/// built-in [`EchoBehaviour`].
31type ResponderRegistry = HashMap<String, Arc<dyn ParticipantBehaviour>>;
32
33/// Marker for resources retained by a connection process until unsubscribe.
34pub trait SubscriptionResource: std::fmt::Debug + Send {
35    /// Releases the library subscription resource.
36    ///
37    /// # Errors
38    /// Returns [`ServerError`] when the liminal library reports an unsubscribe failure.
39    fn unsubscribe(self: Box<Self>) -> Result<(), ServerError>;
40}
41
42/// Library subscription resource owned by a single connection process.
43#[derive(Debug)]
44pub struct ConnectionSubscription {
45    id: u64,
46    selected_schema: ProtocolSchemaId,
47    resource: Box<dyn SubscriptionResource>,
48}
49
50impl ConnectionSubscription {
51    /// Creates an owned subscription resource for one connection process.
52    #[must_use]
53    pub fn new(
54        id: u64,
55        selected_schema: ProtocolSchemaId,
56        resource: Box<dyn SubscriptionResource>,
57    ) -> Self {
58        Self {
59            id,
60            selected_schema,
61            resource,
62        }
63    }
64
65    /// Returns the protocol subscription id.
66    #[must_use]
67    pub const fn id(&self) -> u64 {
68        self.id
69    }
70
71    /// Returns the schema selected for this subscription stream.
72    #[must_use]
73    pub const fn selected_schema(&self) -> ProtocolSchemaId {
74        self.selected_schema
75    }
76
77    pub(super) fn unsubscribe(self) -> Result<(), ServerError> {
78        self.resource.unsubscribe()
79    }
80}
81
82/// Outcome of a server publish.
83///
84/// Carries the assigned message id plus a genuine delivery ack (`delivered` = the
85/// message was accepted by at least one live subscriber on this publish, after any
86/// dedup-on-delivery suppression).
87#[derive(Clone, Copy, Debug, PartialEq, Eq)]
88pub struct PublishOutcome {
89    /// Monotonic message id assigned to the accepted publish.
90    pub message_id: u64,
91    /// Whether the message was genuinely delivered to a subscriber. `false` means
92    /// the publish was accepted but reached no subscriber (empty channel) or was
93    /// a duplicate suppressed by dedup-on-delivery.
94    pub delivered: bool,
95}
96
97/// Operations that adapt wire frames to liminal library calls.
98pub trait ConnectionServices: std::fmt::Debug + Send + Sync {
99    /// Delegates a publish request to the liminal library.
100    ///
101    /// `idempotency_key`, when `Some`, drives dedup-on-delivery: a re-publish with
102    /// the same key is delivered to subscribers at most once. The returned
103    /// [`PublishOutcome`] carries the genuine delivery ack.
104    ///
105    /// # Errors
106    /// Returns [`ServerError`] when the liminal publish operation fails.
107    fn publish(
108        &self,
109        channel: &str,
110        envelope: &MessageEnvelope,
111        idempotency_key: Option<&str>,
112    ) -> Result<PublishOutcome, ServerError>;
113
114    /// Delegates a subscribe request to the liminal library.
115    ///
116    /// # Errors
117    /// Returns [`ServerError`] when the liminal subscribe operation fails.
118    fn subscribe(
119        &self,
120        channel: &str,
121        accepted_schemas: &[ProtocolSchemaId],
122    ) -> Result<ConnectionSubscription, ServerError>;
123
124    /// Delegates unsubscribe to the liminal library.
125    ///
126    /// # Errors
127    /// Returns [`ServerError`] when the liminal unsubscribe operation fails.
128    fn unsubscribe(&self, subscription: ConnectionSubscription) -> Result<(), ServerError>;
129
130    /// Delegates conversation open to the liminal library.
131    ///
132    /// # Errors
133    /// Returns [`ServerError`] when the liminal conversation open operation fails.
134    fn open_conversation(
135        &self,
136        conversation_id: u64,
137        subject: &str,
138    ) -> Result<ConnectionConversation, ServerError>;
139
140    /// Delegates a conversation message to the liminal library.
141    ///
142    /// # Errors
143    /// Returns [`ServerError`] when the liminal conversation message operation fails.
144    fn conversation_message(
145        &self,
146        conversation: &ConnectionConversation,
147        envelope: &MessageEnvelope,
148    ) -> Result<(), ServerError>;
149
150    /// Delegates conversation close to the liminal library.
151    ///
152    /// # Errors
153    /// Returns [`ServerError`] when the liminal conversation close operation fails.
154    fn close_conversation(&self, conversation: ConnectionConversation) -> Result<(), ServerError>;
155
156    /// Flushes durable channel state through the liminal library boundary.
157    ///
158    /// # Errors
159    /// Returns [`ServerError`] when the liminal channel flush operation fails.
160    fn flush_durable_state(&self) -> Result<(), ServerError>;
161}
162
163/// Default adapter from server wire frames to liminal channel/conversation APIs.
164#[derive(Debug)]
165pub struct LiminalConnectionServices {
166    channels: HashMap<String, ConfiguredChannel>,
167    cluster: ChannelCluster,
168    durable_store: Arc<dyn DurableStore>,
169    /// In-memory (haematite-backed) dedup cache for dedup-on-delivery. Keyed by
170    /// the per-message idempotency key carried on the publish frame; a duplicate
171    /// key is suppressed before fan-out so a subscriber receives it at most once.
172    /// Not persisted across restarts (13-L1 scope; durable dedup is deferred).
173    dedup: DedupCache,
174    conversation_supervisor: Arc<ConversationSupervisor>,
175    /// Registered custom conversation responders, keyed by conversation subject.
176    ///
177    /// When a conversation is opened (`open_conversation`), the subject is looked
178    /// up here: a registered [`ParticipantBehaviour`] becomes the conversation's
179    /// participant; with no registration the conversation falls back to the
180    /// built-in [`EchoBehaviour`], preserving the original echo semantics exactly.
181    /// This is the seam aion #13 plugs a remote worker responder into. Interior
182    /// mutability is required because the services are shared behind `&self`.
183    responders: Mutex<ResponderRegistry>,
184    next_message_id: AtomicU64,
185    next_subscription_id: AtomicU64,
186}
187
188impl LiminalConnectionServices {
189    /// Builds library-backed services from validated server configuration.
190    ///
191    /// Durable-mode channels are backed by a shared haematite event store so
192    /// their publishes are persisted and survive the graceful-shutdown flush;
193    /// ephemeral channels carry no store.
194    ///
195    /// # Errors
196    /// Returns [`ServerError`] when a configured channel cannot be initialized.
197    pub fn from_config(config: &ServerConfig) -> Result<Self, ServerError> {
198        let store = build_durable_store(config.persistence_path.as_deref())?;
199        Self::from_config_with_store(config, store)
200    }
201
202    /// Builds services over a caller-provided durable store.
203    ///
204    /// Used by tests that need to inspect persisted state through the same store
205    /// handle the durable channels write to.
206    ///
207    /// # Errors
208    /// Returns [`ServerError`] when a configured channel cannot be initialized.
209    pub fn from_config_with_store(
210        config: &ServerConfig,
211        durable_store: Arc<dyn DurableStore>,
212    ) -> Result<Self, ServerError> {
213        // Build ONE shared channel supervisor for the whole server. When a
214        // [cluster] section is present it is distribution-enabled, so every
215        // channel actor and subscriber shares the clustered scheduler the cluster
216        // attaches its process-group transport to (SRV-005, Constraint B).
217        let cluster = build_channel_cluster(config.cluster.as_ref())?;
218        let mut channels = HashMap::new();
219        for channel in &config.channels {
220            let schema = Schema::new(serde_json::json!({})).map_err(|error| {
221                ServerError::ConfigValidation {
222                    message: format!("failed to initialize channel '{}': {error}", channel.name),
223                }
224            })?;
225            let channel_config = if channel.durable {
226                ChannelConfig::new(channel.name.clone(), schema, ChannelMode::Durable)
227            } else {
228                ChannelConfig::new(channel.name.clone(), schema, ChannelMode::Ephemeral)
229            };
230            let handle = if channel.durable {
231                ChannelHandle::new_durable_with_supervisor(
232                    channel_config,
233                    Arc::clone(&durable_store),
234                    cluster.supervisor().clone(),
235                )
236                .map_err(|error| ServerError::ConfigValidation {
237                    message: format!(
238                        "failed to initialize durable channel '{}': {error}",
239                        channel.name
240                    ),
241                })?
242            } else {
243                ChannelHandle::with_supervisor(channel_config, cluster.supervisor().clone())
244            };
245            channels.insert(
246                channel.name.clone(),
247                ConfiguredChannel {
248                    handle,
249                    protocol_schema: schema_ref_id(&channel.schema_ref),
250                },
251            );
252        }
253        let conversation_supervisor = Arc::new(ConversationSupervisor::new().map_err(|error| {
254            ServerError::ConfigValidation {
255                message: format!("failed to start conversation supervisor: {error}"),
256            }
257        })?);
258        let dedup = DedupCache::new(Arc::clone(&durable_store), DELIVERY_DEDUP_NAMESPACE);
259        Ok(Self {
260            channels,
261            cluster,
262            durable_store,
263            dedup,
264            conversation_supervisor,
265            responders: Mutex::new(HashMap::new()),
266            next_message_id: AtomicU64::new(1),
267            next_subscription_id: AtomicU64::new(1),
268        })
269    }
270
271    /// Builds services with no configured channels.
272    ///
273    /// # Errors
274    /// Returns [`ServerError`] when the conversation supervisor scheduler cannot start.
275    pub fn empty() -> Result<Self, ServerError> {
276        let conversation_supervisor = Arc::new(ConversationSupervisor::new().map_err(|error| {
277            ServerError::ConfigValidation {
278                message: format!("failed to start conversation supervisor: {error}"),
279            }
280        })?);
281        let durable_store = build_durable_store(None)?;
282        let dedup = DedupCache::new(Arc::clone(&durable_store), DELIVERY_DEDUP_NAMESPACE);
283        Ok(Self {
284            channels: HashMap::new(),
285            cluster: build_channel_cluster(None)?,
286            durable_store,
287            dedup,
288            conversation_supervisor,
289            responders: Mutex::new(HashMap::new()),
290            next_message_id: AtomicU64::new(1),
291            next_subscription_id: AtomicU64::new(1),
292        })
293    }
294
295    /// The shared channel supervisor + cluster resolver backing this service.
296    ///
297    /// The server runtime uses this to attach the cluster to the channel
298    /// supervisor's clustered scheduler (SRV-005).
299    #[must_use]
300    pub const fn channel_cluster(&self) -> &ChannelCluster {
301        &self.cluster
302    }
303
304    /// Returns the shared durable store backing this service's durable channels.
305    #[must_use]
306    pub fn durable_store(&self) -> Arc<dyn DurableStore> {
307        Arc::clone(&self.durable_store)
308    }
309
310    /// Returns the conversation supervisor backing supervised conversations.
311    ///
312    /// Tests use this to reach the underlying beamr scheduler so they can spawn
313    /// or terminate participant processes and exercise crash detection.
314    #[must_use]
315    pub fn conversation_supervisor(&self) -> Arc<ConversationSupervisor> {
316        Arc::clone(&self.conversation_supervisor)
317    }
318
319    /// Registers a custom conversation responder for a routing `subject`.
320    ///
321    /// When a conversation is later opened with this exact `subject`, its
322    /// participant runs `behaviour` instead of the built-in [`EchoBehaviour`].
323    /// The responder is spawned and supervised identically to the echo
324    /// participant — a real linked beamr process with the same crash-detection
325    /// semantics — so this exposes the responder seam without changing how
326    /// participants run. Registering a subject that already has a responder
327    /// replaces it; the previous behaviour is returned.
328    ///
329    /// This is the liminal-side seam aion #13 plugs a remote worker into: it
330    /// registers a responder that forwards each request to the worker and routes
331    /// the worker's reply back through the conversation. Subjects with no
332    /// registration keep echoing, so existing callers are unaffected.
333    ///
334    /// # Errors
335    /// Returns [`ServerError`] when the responder registry lock is poisoned.
336    pub fn register_responder(
337        &self,
338        subject: impl Into<String>,
339        behaviour: Arc<dyn ParticipantBehaviour>,
340    ) -> Result<Option<Arc<dyn ParticipantBehaviour>>, ServerError> {
341        let mut responders = self.lock_responders()?;
342        Ok(responders.insert(subject.into(), behaviour))
343    }
344
345    /// Removes the custom responder registered for `subject`, if any.
346    ///
347    /// After removal the subject reverts to the built-in [`EchoBehaviour`] on the
348    /// next [`Self::open_conversation`]. Returns the removed behaviour when one
349    /// was registered.
350    ///
351    /// # Errors
352    /// Returns [`ServerError`] when the responder registry lock is poisoned.
353    pub fn unregister_responder(
354        &self,
355        subject: &str,
356    ) -> Result<Option<Arc<dyn ParticipantBehaviour>>, ServerError> {
357        let mut responders = self.lock_responders()?;
358        Ok(responders.remove(subject))
359    }
360
361    /// Resolves the responder behaviour for `subject`: the registered custom
362    /// responder when present, otherwise the built-in [`EchoBehaviour`].
363    ///
364    /// This is the single routing decision behind the seam — registered-or-echo —
365    /// so the fallback is identical to the original hard-wired echo path.
366    fn responder_for(&self, subject: &str) -> Result<Arc<dyn ParticipantBehaviour>, ServerError> {
367        let responders = self.lock_responders()?;
368        Ok(responders.get(subject).map_or_else(
369            || Arc::new(EchoBehaviour) as Arc<dyn ParticipantBehaviour>,
370            Arc::clone,
371        ))
372    }
373
374    /// Locks the responder registry, mapping a poisoned lock to a [`ServerError`]
375    /// rather than panicking (the workspace denies `unwrap`/`expect`/`panic`).
376    fn lock_responders(&self) -> Result<std::sync::MutexGuard<'_, ResponderRegistry>, ServerError> {
377        self.responders
378            .lock()
379            .map_err(|_poisoned| ServerError::ListenerAccept {
380                message: "responder registry lock poisoned".to_owned(),
381            })
382    }
383
384    /// Subscribes to a configured channel and returns the raw library
385    /// subscription handle so a test can drain the subscriber inbox directly and
386    /// observe exactly which messages reached a subscriber.
387    #[cfg(test)]
388    pub(crate) fn subscribe_handle_for_test(
389        &self,
390        channel: &str,
391    ) -> Result<liminal::channel::SubscriptionHandle, ServerError> {
392        let configured = self
393            .channels
394            .get(channel)
395            .ok_or_else(|| ServerError::ListenerAccept {
396                message: format!("channel '{channel}' is not configured"),
397            })?;
398        configured
399            .handle
400            .subscribe()
401            .map_err(|error| ServerError::ListenerAccept {
402                message: format!("liminal subscribe failed for channel '{channel}': {error}"),
403            })
404    }
405
406    /// Claims the delivery right for an idempotency key.
407    ///
408    /// Returns `Ok(true)` when this is the first publish for the key (the caller
409    /// may deliver), and `Ok(false)` when the key was already claimed/completed (a
410    /// duplicate the caller must suppress). The dedup cache is driven synchronously
411    /// over the in-memory haematite store via the durable bridge.
412    fn claim_delivery(&self, key: &str) -> Result<bool, ServerError> {
413        let decision = block_on(self.dedup.claim_or_get(key, dedup_timestamp_millis()))
414            .map_err(|error| ServerError::ListenerAccept {
415                message: format!("dedup bridge failed for key '{key}': {error}"),
416            })?
417            .map_err(|error| ServerError::ListenerAccept {
418                message: format!("dedup claim failed for key '{key}': {error}"),
419            })?;
420        Ok(matches!(decision, DedupDecision::Claimed))
421    }
422
423    /// Releases a dangling in-flight dedup claim after a failed delivery.
424    ///
425    /// Best-effort: a release failure cannot mask the original publish error, so
426    /// this returns nothing and logs at `error` level instead of surfacing. It is
427    /// never silent — the leak (a permanently suppressed key) must be observable.
428    /// `release_claim` itself never clobbers a stored receipt, so calling it on the
429    /// failure path is safe even if a concurrent completion raced ahead.
430    fn release_claim(&self, key: &str) {
431        match block_on(self.dedup.release_claim(key)) {
432            Ok(Ok(())) => {}
433            Ok(Err(error)) => {
434                tracing::error!(
435                    idempotency_key = key,
436                    %error,
437                    "failed to release dedup claim after publish failure; key may stay suppressed"
438                );
439            }
440            Err(error) => {
441                tracing::error!(
442                    idempotency_key = key,
443                    %error,
444                    "dedup release bridge failed after publish failure; key may stay suppressed"
445                );
446            }
447        }
448    }
449}
450
451/// Returns the current epoch-millis timestamp used as the dedup entry anchor.
452///
453/// A clock error before the Unix epoch yields `0`: the timestamp is only a TTL
454/// anchor for the in-memory cache and a zero anchor never breaks the at-most-once
455/// claim semantics, so this avoids surfacing a clock fault on the publish path.
456fn dedup_timestamp_millis() -> u64 {
457    use std::time::{SystemTime, UNIX_EPOCH};
458    SystemTime::now()
459        .duration_since(UNIX_EPOCH)
460        .ok()
461        .and_then(|duration| u64::try_from(duration.as_millis()).ok())
462        .unwrap_or(0)
463}
464
465/// Default shard count for an on-disk durable store.
466///
467/// Haematite routes keys across this many single-threaded shard actors; a small
468/// power of two gives parallelism across cursors/streams without spawning an
469/// actor per core. The value is fixed (haematite has no silent default) and not
470/// yet surfaced in server config.
471const DEFAULT_SHARD_COUNT: usize = 8;
472
473/// Namespace prefix for the dedup-on-delivery cache streams. Keeps delivery dedup
474/// keys from colliding with any other haematite streams in the shared store.
475const DELIVERY_DEDUP_NAMESPACE: &str = "liminal:delivery-dedup";
476
477/// Builds the on-disk haematite-backed durable store.
478///
479/// When `persistence_path` is `Some`, the database lives there and survives
480/// process restarts: an existing database directory is reopened, a fresh one is
481/// created. When it is `None` (no durable path configured, or the channel-free
482/// `empty()` services used by tests), an ephemeral per-instance directory under
483/// the system temp dir is created instead — it still persists to disk for the
484/// lifetime of the process, but is not a stable restart location.
485fn build_durable_store(
486    persistence_path: Option<&Path>,
487) -> Result<Arc<dyn DurableStore>, ServerError> {
488    let data_dir = persistence_path.map_or_else(ephemeral_data_dir, |path| path.join("durability"));
489    let database = open_or_create_database(&data_dir)?;
490    let event_store = EventStore::new(database);
491    Ok(Arc::new(HaematiteStore::new(Arc::new(event_store))))
492}
493
494/// Opens an existing haematite database at `data_dir`, or creates one.
495fn open_or_create_database(data_dir: &Path) -> Result<Database, ServerError> {
496    let config_file = data_dir.join("config.json");
497    let result = if config_file.exists() {
498        Database::open(data_dir)
499    } else {
500        Database::create(DatabaseConfig {
501            data_dir: data_dir.to_path_buf(),
502            shard_count: DEFAULT_SHARD_COUNT,
503            sweep_interval: None,
504            distributed: None,
505        })
506    };
507    result.map_err(|error| ServerError::ConfigValidation {
508        message: format!(
509            "failed to open durable store at {}: {error}",
510            data_dir.display()
511        ),
512    })
513}
514
515/// Produces a unique on-disk directory under the system temp dir.
516///
517/// The pid plus a monotonic counter keep concurrent servers and parallel tests
518/// from sharing a database directory.
519fn ephemeral_data_dir() -> PathBuf {
520    static COUNTER: AtomicU64 = AtomicU64::new(0);
521    let unique = COUNTER.fetch_add(1, Ordering::Relaxed);
522    std::env::temp_dir().join(format!(
523        "liminal-durability-{}-{unique}",
524        std::process::id()
525    ))
526}
527
528impl ConnectionServices for LiminalConnectionServices {
529    fn publish(
530        &self,
531        channel: &str,
532        envelope: &MessageEnvelope,
533        idempotency_key: Option<&str>,
534    ) -> Result<PublishOutcome, ServerError> {
535        let handle = self
536            .channels
537            .get(channel)
538            .map(|configured| configured.handle.clone())
539            .ok_or_else(|| ServerError::ListenerAccept {
540                message: format!("channel '{channel}' is not configured"),
541            })?;
542
543        // Dedup-on-delivery: a publish carrying an idempotency key is delivered to
544        // subscribers AT MOST ONCE across re-publishes of the same key. Only a
545        // fresh `Claimed` decision proceeds to fan-out; a `Completed`/`InFlight`
546        // decision is a duplicate and is suppressed (no second delivery), which is
547        // the at-most-once guarantee the aion outbox relies on.
548        if let Some(key) = idempotency_key {
549            if !self.claim_delivery(key)? {
550                return Ok(PublishOutcome {
551                    message_id: self.next_message_id.fetch_add(1, Ordering::Relaxed),
552                    delivered: false,
553                });
554            }
555        }
556
557        let delivery = handle.publish_with_delivery(
558            &envelope.payload,
559            liminal::envelope::PublisherId::default(),
560            None,
561        );
562        let delivery = match delivery {
563            Ok(delivery) => delivery,
564            Err(error) => {
565                // The claim above appended an `InFlight` entry but the delivery
566                // failed before `complete_receipt` could run. Release the claim so
567                // the key is re-claimable; otherwise every re-publish would see
568                // `InFlight` and be suppressed forever. Best-effort: surface the
569                // ORIGINAL publish error regardless, but never swallow a release
570                // failure silently (it leaves the leak intact).
571                if let Some(key) = idempotency_key {
572                    self.release_claim(key);
573                }
574                return Err(ServerError::ListenerAccept {
575                    message: format!("liminal publish failed for channel '{channel}': {error}"),
576                });
577            }
578        };
579
580        // Record the dedup completion AFTER a successful claimed delivery so the
581        // claim is not left dangling `InFlight` (which would wrongly defer every
582        // future duplicate). The receipt body is empty: the dedup contract here
583        // only needs presence, not a stored result.
584        if let Some(key) = idempotency_key {
585            block_on(
586                self.dedup
587                    .complete_receipt(key, ProcessingReceipt::new(Vec::new())),
588            )
589            .map_err(|error| ServerError::ListenerAccept {
590                message: format!("dedup receipt bridge failed for key '{key}': {error}"),
591            })?
592            .map_err(|error| ServerError::ListenerAccept {
593                message: format!("dedup receipt write failed for key '{key}': {error}"),
594            })?;
595        }
596
597        Ok(PublishOutcome {
598            message_id: self.next_message_id.fetch_add(1, Ordering::Relaxed),
599            delivered: delivery.is_delivered(),
600        })
601    }
602
603    fn subscribe(
604        &self,
605        channel: &str,
606        accepted_schemas: &[ProtocolSchemaId],
607    ) -> Result<ConnectionSubscription, ServerError> {
608        let configured = self
609            .channels
610            .get(channel)
611            .ok_or_else(|| ServerError::ListenerAccept {
612                message: format!("channel '{channel}' is not configured"),
613            })?;
614        let selected_schema = if accepted_schemas.is_empty() {
615            configured.protocol_schema
616        } else {
617            liminal::protocol::negotiate_schema(configured.protocol_schema, accepted_schemas)
618                .map_err(|error| server_error_from_protocol(&error))?
619        };
620        let subscription =
621            configured
622                .handle
623                .subscribe()
624                .map_err(|error| ServerError::ListenerAccept {
625                    message: format!("liminal subscribe failed for channel '{channel}': {error}"),
626                })?;
627        let id = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
628        Ok(ConnectionSubscription::new(
629            id,
630            selected_schema,
631            Box::new(LiminalSubscriptionResource { subscription }),
632        ))
633    }
634
635    fn unsubscribe(&self, subscription: ConnectionSubscription) -> Result<(), ServerError> {
636        subscription.unsubscribe()
637    }
638
639    fn open_conversation(
640        &self,
641        conversation_id: u64,
642        subject: &str,
643    ) -> Result<ConnectionConversation, ServerError> {
644        // Spawn a REAL participant process (a beamr `NativeHandler` running the
645        // resolved responder behaviour) on the conversation supervisor's
646        // scheduler, and a supervised conversation actor linked to it. The actor
647        // FORWARDS each conversation message to the participant, which genuinely
648        // processes it and delivers a reply back. The actor traps the
649        // participant's EXIT (a beamr process link), so killing it fires a
650        // structural, microsecond-scale crash signal.
651        //
652        // The responder is chosen by `subject`: a custom responder registered via
653        // `register_responder` for this subject, or the built-in `EchoBehaviour`
654        // when none is registered. Either way it runs as the SAME supervised,
655        // linked participant process — the seam changes WHO responds, not HOW the
656        // participant is spawned or supervised.
657        let behaviour = self.responder_for(subject)?;
658        let (actor, participant) = self
659            .conversation_supervisor
660            .spawn_with_participant(behaviour, None, ChannelMode::Ephemeral, CrashPolicy::Fail)
661            .map_err(|error| ServerError::ListenerAccept {
662                message: format!(
663                    "failed to spawn supervised conversation {conversation_id} ('{subject}'): {error}"
664                ),
665            })?;
666
667        // Drive boot to completion so the beamr link to the participant exists
668        // before any message is forwarded (link-before-forward), mirroring the
669        // ROUTING-004 dispatch pattern.
670        actor.pid().map_err(|error| ServerError::ListenerAccept {
671            message: format!(
672                "failed to boot supervised conversation {conversation_id} ('{subject}'): {error}"
673            ),
674        })?;
675
676        // Register the structural EXIT notifier BEFORE returning, so a crash that
677        // fires the instant a message reaches the participant is never missed.
678        // The notifier is woken by the actor's trapped-EXIT handler (event
679        // driven), and a crash that already landed is replayed immediately.
680        let (exit_tx, exit_rx) = mpsc::sync_channel::<Instant>(1);
681        actor
682            .notify_on_participant_exit(participant, exit_tx)
683            .map_err(|error| ServerError::ListenerAccept {
684                message: format!(
685                    "failed to arm crash detection for conversation {conversation_id}: {error}"
686                ),
687            })?;
688
689        Ok(ConnectionConversation::new(Box::new(
690            LiminalConversationResource::new(actor, participant, exit_rx),
691        )))
692    }
693
694    fn conversation_message(
695        &self,
696        conversation: &ConnectionConversation,
697        envelope: &MessageEnvelope,
698    ) -> Result<(), ServerError> {
699        conversation.message(envelope)
700    }
701
702    fn close_conversation(&self, conversation: ConnectionConversation) -> Result<(), ServerError> {
703        conversation.close()
704    }
705
706    fn flush_durable_state(&self) -> Result<(), ServerError> {
707        for (channel_name, configured) in &self.channels {
708            if configured.handle.config().mode == ChannelMode::Durable {
709                configured
710                    .handle
711                    .flush()
712                    .map_err(|error| ServerError::ShutdownFlush {
713                        message: format!(
714                            "failed to flush durable channel '{channel_name}': {error}"
715                        ),
716                    })?;
717            }
718        }
719        Ok(())
720    }
721}
722
723#[derive(Debug)]
724struct ConfiguredChannel {
725    handle: ChannelHandle,
726    protocol_schema: ProtocolSchemaId,
727}
728
729#[derive(Debug)]
730struct LiminalSubscriptionResource {
731    subscription: liminal::channel::SubscriptionHandle,
732}
733
734impl SubscriptionResource for LiminalSubscriptionResource {
735    fn unsubscribe(self: Box<Self>) -> Result<(), ServerError> {
736        drop(self.subscription);
737        Ok(())
738    }
739}
740
741pub(super) fn server_error_from_protocol(error: &ProtocolError) -> ServerError {
742    ServerError::ListenerAccept {
743        message: format!("protocol operation failed: {error}"),
744    }
745}
746
747fn schema_ref_id(schema_ref: &str) -> ProtocolSchemaId {
748    let mut bytes = [0_u8; ProtocolSchemaId::WIRE_LEN];
749    let mut hash = std::collections::hash_map::DefaultHasher::new();
750    schema_ref.hash(&mut hash);
751    let seed = hash.finish().to_be_bytes();
752    for (index, byte) in bytes.iter_mut().enumerate() {
753        *byte = seed[index % seed.len()];
754    }
755    ProtocolSchemaId::new(bytes)
756}