use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, mpsc};
use std::time::Instant;
use haematite::{Database, DatabaseConfig, EventStore};
use liminal::channel::{ChannelConfig, ChannelHandle, ChannelMode, Schema};
use liminal::conversation::{
ConversationSupervisor, CrashPolicy, EchoBehaviour, ParticipantBehaviour,
};
use liminal::durability::bridge::block_on;
use liminal::durability::{
DedupCache, DedupDecision, DurableStore, HaematiteStore, ProcessingReceipt,
};
use liminal::protocol::{MessageEnvelope, ProtocolError, SchemaId as ProtocolSchemaId};
use super::conversation::{ConnectionConversation, LiminalConversationResource};
use super::services_cluster::build_channel_cluster;
use crate::ServerError;
use crate::config::types::ServerConfig;
pub use super::services_cluster::ChannelCluster;
type ResponderRegistry = HashMap<String, Arc<dyn ParticipantBehaviour>>;
pub trait SubscriptionResource: std::fmt::Debug + Send {
fn unsubscribe(self: Box<Self>) -> Result<(), ServerError>;
}
#[derive(Debug)]
pub struct ConnectionSubscription {
id: u64,
selected_schema: ProtocolSchemaId,
resource: Box<dyn SubscriptionResource>,
}
impl ConnectionSubscription {
#[must_use]
pub fn new(
id: u64,
selected_schema: ProtocolSchemaId,
resource: Box<dyn SubscriptionResource>,
) -> Self {
Self {
id,
selected_schema,
resource,
}
}
#[must_use]
pub const fn id(&self) -> u64 {
self.id
}
#[must_use]
pub const fn selected_schema(&self) -> ProtocolSchemaId {
self.selected_schema
}
pub(super) fn unsubscribe(self) -> Result<(), ServerError> {
self.resource.unsubscribe()
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct PublishOutcome {
pub message_id: u64,
pub delivered: bool,
}
pub trait ConnectionServices: std::fmt::Debug + Send + Sync {
fn publish(
&self,
channel: &str,
envelope: &MessageEnvelope,
idempotency_key: Option<&str>,
) -> Result<PublishOutcome, ServerError>;
fn subscribe(
&self,
channel: &str,
accepted_schemas: &[ProtocolSchemaId],
) -> Result<ConnectionSubscription, ServerError>;
fn unsubscribe(&self, subscription: ConnectionSubscription) -> Result<(), ServerError>;
fn open_conversation(
&self,
conversation_id: u64,
subject: &str,
) -> Result<ConnectionConversation, ServerError>;
fn conversation_message(
&self,
conversation: &ConnectionConversation,
envelope: &MessageEnvelope,
) -> Result<(), ServerError>;
fn close_conversation(&self, conversation: ConnectionConversation) -> Result<(), ServerError>;
fn flush_durable_state(&self) -> Result<(), ServerError>;
}
#[derive(Debug)]
pub struct LiminalConnectionServices {
channels: HashMap<String, ConfiguredChannel>,
cluster: ChannelCluster,
durable_store: Arc<dyn DurableStore>,
dedup: DedupCache,
conversation_supervisor: Arc<ConversationSupervisor>,
responders: Mutex<ResponderRegistry>,
next_message_id: AtomicU64,
next_subscription_id: AtomicU64,
}
impl LiminalConnectionServices {
pub fn from_config(config: &ServerConfig) -> Result<Self, ServerError> {
let store = build_durable_store(config.persistence_path.as_deref())?;
Self::from_config_with_store(config, store)
}
pub fn from_config_with_store(
config: &ServerConfig,
durable_store: Arc<dyn DurableStore>,
) -> Result<Self, ServerError> {
let cluster = build_channel_cluster(config.cluster.as_ref())?;
let mut channels = HashMap::new();
for channel in &config.channels {
let schema = Schema::new(serde_json::json!({})).map_err(|error| {
ServerError::ConfigValidation {
message: format!("failed to initialize channel '{}': {error}", channel.name),
}
})?;
let channel_config = if channel.durable {
ChannelConfig::new(channel.name.clone(), schema, ChannelMode::Durable)
} else {
ChannelConfig::new(channel.name.clone(), schema, ChannelMode::Ephemeral)
};
let handle = if channel.durable {
ChannelHandle::new_durable_with_supervisor(
channel_config,
Arc::clone(&durable_store),
cluster.supervisor().clone(),
)
.map_err(|error| ServerError::ConfigValidation {
message: format!(
"failed to initialize durable channel '{}': {error}",
channel.name
),
})?
} else {
ChannelHandle::with_supervisor(channel_config, cluster.supervisor().clone())
};
channels.insert(
channel.name.clone(),
ConfiguredChannel {
handle,
protocol_schema: schema_ref_id(&channel.schema_ref),
},
);
}
let conversation_supervisor = Arc::new(ConversationSupervisor::new().map_err(|error| {
ServerError::ConfigValidation {
message: format!("failed to start conversation supervisor: {error}"),
}
})?);
let dedup = DedupCache::new(Arc::clone(&durable_store), DELIVERY_DEDUP_NAMESPACE);
Ok(Self {
channels,
cluster,
durable_store,
dedup,
conversation_supervisor,
responders: Mutex::new(HashMap::new()),
next_message_id: AtomicU64::new(1),
next_subscription_id: AtomicU64::new(1),
})
}
pub fn empty() -> Result<Self, ServerError> {
let conversation_supervisor = Arc::new(ConversationSupervisor::new().map_err(|error| {
ServerError::ConfigValidation {
message: format!("failed to start conversation supervisor: {error}"),
}
})?);
let durable_store = build_durable_store(None)?;
let dedup = DedupCache::new(Arc::clone(&durable_store), DELIVERY_DEDUP_NAMESPACE);
Ok(Self {
channels: HashMap::new(),
cluster: build_channel_cluster(None)?,
durable_store,
dedup,
conversation_supervisor,
responders: Mutex::new(HashMap::new()),
next_message_id: AtomicU64::new(1),
next_subscription_id: AtomicU64::new(1),
})
}
#[must_use]
pub const fn channel_cluster(&self) -> &ChannelCluster {
&self.cluster
}
#[must_use]
pub fn durable_store(&self) -> Arc<dyn DurableStore> {
Arc::clone(&self.durable_store)
}
#[must_use]
pub fn conversation_supervisor(&self) -> Arc<ConversationSupervisor> {
Arc::clone(&self.conversation_supervisor)
}
pub fn register_responder(
&self,
subject: impl Into<String>,
behaviour: Arc<dyn ParticipantBehaviour>,
) -> Result<Option<Arc<dyn ParticipantBehaviour>>, ServerError> {
let mut responders = self.lock_responders()?;
Ok(responders.insert(subject.into(), behaviour))
}
pub fn unregister_responder(
&self,
subject: &str,
) -> Result<Option<Arc<dyn ParticipantBehaviour>>, ServerError> {
let mut responders = self.lock_responders()?;
Ok(responders.remove(subject))
}
fn responder_for(&self, subject: &str) -> Result<Arc<dyn ParticipantBehaviour>, ServerError> {
let responders = self.lock_responders()?;
Ok(responders.get(subject).map_or_else(
|| Arc::new(EchoBehaviour) as Arc<dyn ParticipantBehaviour>,
Arc::clone,
))
}
fn lock_responders(&self) -> Result<std::sync::MutexGuard<'_, ResponderRegistry>, ServerError> {
self.responders
.lock()
.map_err(|_poisoned| ServerError::ListenerAccept {
message: "responder registry lock poisoned".to_owned(),
})
}
#[cfg(test)]
pub(crate) fn subscribe_handle_for_test(
&self,
channel: &str,
) -> Result<liminal::channel::SubscriptionHandle, ServerError> {
let configured = self
.channels
.get(channel)
.ok_or_else(|| ServerError::ListenerAccept {
message: format!("channel '{channel}' is not configured"),
})?;
configured
.handle
.subscribe()
.map_err(|error| ServerError::ListenerAccept {
message: format!("liminal subscribe failed for channel '{channel}': {error}"),
})
}
fn claim_delivery(&self, key: &str) -> Result<bool, ServerError> {
let decision = block_on(self.dedup.claim_or_get(key, dedup_timestamp_millis()))
.map_err(|error| ServerError::ListenerAccept {
message: format!("dedup bridge failed for key '{key}': {error}"),
})?
.map_err(|error| ServerError::ListenerAccept {
message: format!("dedup claim failed for key '{key}': {error}"),
})?;
Ok(matches!(decision, DedupDecision::Claimed))
}
fn release_claim(&self, key: &str) {
match block_on(self.dedup.release_claim(key)) {
Ok(Ok(())) => {}
Ok(Err(error)) => {
tracing::error!(
idempotency_key = key,
%error,
"failed to release dedup claim after publish failure; key may stay suppressed"
);
}
Err(error) => {
tracing::error!(
idempotency_key = key,
%error,
"dedup release bridge failed after publish failure; key may stay suppressed"
);
}
}
}
}
fn dedup_timestamp_millis() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.and_then(|duration| u64::try_from(duration.as_millis()).ok())
.unwrap_or(0)
}
const DEFAULT_SHARD_COUNT: usize = 8;
const DELIVERY_DEDUP_NAMESPACE: &str = "liminal:delivery-dedup";
fn build_durable_store(
persistence_path: Option<&Path>,
) -> Result<Arc<dyn DurableStore>, ServerError> {
let data_dir = persistence_path.map_or_else(ephemeral_data_dir, |path| path.join("durability"));
let database = open_or_create_database(&data_dir)?;
let event_store = EventStore::new(database);
Ok(Arc::new(HaematiteStore::new(Arc::new(event_store))))
}
fn open_or_create_database(data_dir: &Path) -> Result<Database, ServerError> {
let config_file = data_dir.join("config.json");
let result = if config_file.exists() {
Database::open(data_dir)
} else {
Database::create(DatabaseConfig {
data_dir: data_dir.to_path_buf(),
shard_count: DEFAULT_SHARD_COUNT,
sweep_interval: None,
distributed: None,
})
};
result.map_err(|error| ServerError::ConfigValidation {
message: format!(
"failed to open durable store at {}: {error}",
data_dir.display()
),
})
}
fn ephemeral_data_dir() -> PathBuf {
static COUNTER: AtomicU64 = AtomicU64::new(0);
let unique = COUNTER.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!(
"liminal-durability-{}-{unique}",
std::process::id()
))
}
impl ConnectionServices for LiminalConnectionServices {
fn publish(
&self,
channel: &str,
envelope: &MessageEnvelope,
idempotency_key: Option<&str>,
) -> Result<PublishOutcome, ServerError> {
let handle = self
.channels
.get(channel)
.map(|configured| configured.handle.clone())
.ok_or_else(|| ServerError::ListenerAccept {
message: format!("channel '{channel}' is not configured"),
})?;
if let Some(key) = idempotency_key {
if !self.claim_delivery(key)? {
return Ok(PublishOutcome {
message_id: self.next_message_id.fetch_add(1, Ordering::Relaxed),
delivered: false,
});
}
}
let delivery = handle.publish_with_delivery(
&envelope.payload,
liminal::envelope::PublisherId::default(),
None,
);
let delivery = match delivery {
Ok(delivery) => delivery,
Err(error) => {
if let Some(key) = idempotency_key {
self.release_claim(key);
}
return Err(ServerError::ListenerAccept {
message: format!("liminal publish failed for channel '{channel}': {error}"),
});
}
};
if let Some(key) = idempotency_key {
block_on(
self.dedup
.complete_receipt(key, ProcessingReceipt::new(Vec::new())),
)
.map_err(|error| ServerError::ListenerAccept {
message: format!("dedup receipt bridge failed for key '{key}': {error}"),
})?
.map_err(|error| ServerError::ListenerAccept {
message: format!("dedup receipt write failed for key '{key}': {error}"),
})?;
}
Ok(PublishOutcome {
message_id: self.next_message_id.fetch_add(1, Ordering::Relaxed),
delivered: delivery.is_delivered(),
})
}
fn subscribe(
&self,
channel: &str,
accepted_schemas: &[ProtocolSchemaId],
) -> Result<ConnectionSubscription, ServerError> {
let configured = self
.channels
.get(channel)
.ok_or_else(|| ServerError::ListenerAccept {
message: format!("channel '{channel}' is not configured"),
})?;
let selected_schema = if accepted_schemas.is_empty() {
configured.protocol_schema
} else {
liminal::protocol::negotiate_schema(configured.protocol_schema, accepted_schemas)
.map_err(|error| server_error_from_protocol(&error))?
};
let subscription =
configured
.handle
.subscribe()
.map_err(|error| ServerError::ListenerAccept {
message: format!("liminal subscribe failed for channel '{channel}': {error}"),
})?;
let id = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
Ok(ConnectionSubscription::new(
id,
selected_schema,
Box::new(LiminalSubscriptionResource { subscription }),
))
}
fn unsubscribe(&self, subscription: ConnectionSubscription) -> Result<(), ServerError> {
subscription.unsubscribe()
}
fn open_conversation(
&self,
conversation_id: u64,
subject: &str,
) -> Result<ConnectionConversation, ServerError> {
let behaviour = self.responder_for(subject)?;
let (actor, participant) = self
.conversation_supervisor
.spawn_with_participant(behaviour, None, ChannelMode::Ephemeral, CrashPolicy::Fail)
.map_err(|error| ServerError::ListenerAccept {
message: format!(
"failed to spawn supervised conversation {conversation_id} ('{subject}'): {error}"
),
})?;
actor.pid().map_err(|error| ServerError::ListenerAccept {
message: format!(
"failed to boot supervised conversation {conversation_id} ('{subject}'): {error}"
),
})?;
let (exit_tx, exit_rx) = mpsc::sync_channel::<Instant>(1);
actor
.notify_on_participant_exit(participant, exit_tx)
.map_err(|error| ServerError::ListenerAccept {
message: format!(
"failed to arm crash detection for conversation {conversation_id}: {error}"
),
})?;
Ok(ConnectionConversation::new(Box::new(
LiminalConversationResource::new(actor, participant, exit_rx),
)))
}
fn conversation_message(
&self,
conversation: &ConnectionConversation,
envelope: &MessageEnvelope,
) -> Result<(), ServerError> {
conversation.message(envelope)
}
fn close_conversation(&self, conversation: ConnectionConversation) -> Result<(), ServerError> {
conversation.close()
}
fn flush_durable_state(&self) -> Result<(), ServerError> {
for (channel_name, configured) in &self.channels {
if configured.handle.config().mode == ChannelMode::Durable {
configured
.handle
.flush()
.map_err(|error| ServerError::ShutdownFlush {
message: format!(
"failed to flush durable channel '{channel_name}': {error}"
),
})?;
}
}
Ok(())
}
}
#[derive(Debug)]
struct ConfiguredChannel {
handle: ChannelHandle,
protocol_schema: ProtocolSchemaId,
}
#[derive(Debug)]
struct LiminalSubscriptionResource {
subscription: liminal::channel::SubscriptionHandle,
}
impl SubscriptionResource for LiminalSubscriptionResource {
fn unsubscribe(self: Box<Self>) -> Result<(), ServerError> {
drop(self.subscription);
Ok(())
}
}
pub(super) fn server_error_from_protocol(error: &ProtocolError) -> ServerError {
ServerError::ListenerAccept {
message: format!("protocol operation failed: {error}"),
}
}
fn schema_ref_id(schema_ref: &str) -> ProtocolSchemaId {
let mut bytes = [0_u8; ProtocolSchemaId::WIRE_LEN];
let mut hash = std::collections::hash_map::DefaultHasher::new();
schema_ref.hash(&mut hash);
let seed = hash.finish().to_be_bytes();
for (index, byte) in bytes.iter_mut().enumerate() {
*byte = seed[index % seed.len()];
}
ProtocolSchemaId::new(bytes)
}