use std::sync::atomic::AtomicU32;
use std::sync::{Arc, Mutex, OnceLock};
use std::time::{SystemTime, UNIX_EPOCH};
use serde_json::Value;
use crate::causal::CausalContext;
use crate::channel::actor::{ChannelActorCore, predicate_from};
use crate::channel::observer::ClusterObserver;
use crate::channel::schema::{Schema, SchemaId, SchemaValidationError};
use crate::channel::subscription::{SubscriptionHandle, SubscriptionPredicate};
use crate::channel::supervisor::{ChannelSupervisor, shared_supervisor};
use crate::durability::bridge::block_on;
use crate::durability::{DurableChannel, DurableStore, MessageEnvelope};
use crate::envelope::{Envelope, PublisherId};
use crate::error::LiminalError;
const RUNTIME_DURABLE_PARTITIONS: usize = 1;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct ChannelDelivery {
delivered_count: usize,
}
impl ChannelDelivery {
#[must_use]
pub const fn delivered_count(&self) -> usize {
self.delivered_count
}
#[must_use]
pub const fn is_delivered(&self) -> bool {
self.delivered_count > 0
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ChannelMode {
Ephemeral,
Durable,
}
pub type SchemaRef = Schema;
#[derive(Clone, Debug)]
pub struct ChannelConfig {
pub name: String,
pub schema: Schema,
pub mode: ChannelMode,
}
impl ChannelConfig {
#[must_use]
pub const fn new(name: String, schema: Schema, mode: ChannelMode) -> Self {
Self { name, schema, mode }
}
}
struct ChannelActorState {
supervisor: Result<ChannelSupervisor, String>,
core: OnceLock<Result<Arc<ChannelActorCore>, String>>,
restarts: AtomicU32,
}
impl ChannelActorState {
const fn new(supervisor: Result<ChannelSupervisor, String>) -> Self {
Self {
supervisor,
core: OnceLock::new(),
restarts: AtomicU32::new(0),
}
}
fn supervisor(&self) -> Result<&ChannelSupervisor, LiminalError> {
self.supervisor
.as_ref()
.map_err(|message| LiminalError::PublishFailed {
message: format!("channel supervisor unavailable: {message}"),
})
}
fn observer(&self) -> Option<Arc<dyn ClusterObserver>> {
self.supervisor
.as_ref()
.ok()
.and_then(|supervisor| supervisor.observer().cloned())
}
fn core(&self, schema: &Schema) -> Result<Arc<ChannelActorCore>, LiminalError> {
let supervisor = self.supervisor()?;
let stored = self.core.get_or_init(|| {
supervisor
.spawn_channel(schema.clone())
.map_err(|error| error.to_string())
});
let core = stored
.as_ref()
.map_err(|message| LiminalError::PublishFailed {
message: format!("channel actor unavailable: {message}"),
})?;
supervisor.ensure_running(core, &self.restarts)?;
Ok(Arc::clone(core))
}
}
impl std::fmt::Debug for ChannelActorState {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("ChannelActorState")
.field("supervisor", &self.supervisor)
.finish_non_exhaustive()
}
}
#[derive(Clone, Debug)]
pub struct ChannelHandle {
config: ChannelConfig,
actor: Arc<ChannelActorState>,
durable: Option<Arc<Mutex<DurableChannel>>>,
}
impl ChannelHandle {
#[must_use]
pub fn new(config: ChannelConfig) -> Self {
let supervisor = shared_supervisor().map_err(|error| error.to_string());
Self {
config,
actor: Arc::new(ChannelActorState::new(supervisor)),
durable: None,
}
}
#[must_use]
pub fn with_supervisor(config: ChannelConfig, supervisor: ChannelSupervisor) -> Self {
Self {
config,
actor: Arc::new(ChannelActorState::new(Ok(supervisor))),
durable: None,
}
}
pub fn new_durable(
config: ChannelConfig,
store: Arc<dyn DurableStore>,
) -> Result<Self, LiminalError> {
let durable = DurableChannel::new(config.name.clone(), RUNTIME_DURABLE_PARTITIONS, store)
.map_err(|error| LiminalError::PublishFailed {
message: format!(
"failed to initialize durable channel '{}': {error}",
config.name
),
})?;
let supervisor = shared_supervisor()?;
Ok(Self {
config,
actor: Arc::new(ChannelActorState::new(Ok(supervisor))),
durable: Some(Arc::new(Mutex::new(durable))),
})
}
pub fn new_durable_with_supervisor(
config: ChannelConfig,
store: Arc<dyn DurableStore>,
supervisor: ChannelSupervisor,
) -> Result<Self, LiminalError> {
let durable = DurableChannel::new(config.name.clone(), RUNTIME_DURABLE_PARTITIONS, store)
.map_err(|error| LiminalError::PublishFailed {
message: format!(
"failed to initialize durable channel '{}': {error}",
config.name
),
})?;
Ok(Self {
config,
actor: Arc::new(ChannelActorState::new(Ok(supervisor))),
durable: Some(Arc::new(Mutex::new(durable))),
})
}
#[must_use]
pub const fn config(&self) -> &ChannelConfig {
&self.config
}
pub fn publish<Payload>(&self, payload: Payload) -> Result<(), LiminalError>
where
Payload: AsRef<[u8]>,
{
self.publish_with_context(payload, PublisherId::default(), None)
}
pub fn publish_from<Payload>(
&self,
publisher_id: impl Into<PublisherId>,
payload: Payload,
) -> Result<(), LiminalError>
where
Payload: AsRef<[u8]>,
{
self.publish_with_context(payload, publisher_id.into(), None)
}
pub fn publish_with_context<Payload>(
&self,
payload: Payload,
publisher_id: PublisherId,
causal_context: Option<CausalContext>,
) -> Result<(), LiminalError>
where
Payload: AsRef<[u8]>,
{
self.publish_with_delivery(payload, publisher_id, causal_context)
.map(|_delivery| ())
}
pub fn publish_with_delivery<Payload>(
&self,
payload: Payload,
publisher_id: PublisherId,
causal_context: Option<CausalContext>,
) -> Result<ChannelDelivery, LiminalError>
where
Payload: AsRef<[u8]>,
{
if let Some(durable) = self.durable.as_ref() {
self.persist_durable(durable, payload.as_ref(), &publisher_id)?;
}
let core = self.core()?;
let outcome = core.publish(payload.as_ref().to_vec(), publisher_id, causal_context)?;
if let Some(observer) = self.actor.observer() {
observer.on_publish(&self.config.name, &outcome.envelope);
}
Ok(ChannelDelivery {
delivered_count: outcome.delivered_count,
})
}
fn persist_durable(
&self,
durable: &Arc<Mutex<DurableChannel>>,
payload: &[u8],
publisher_id: &PublisherId,
) -> Result<(), LiminalError> {
let envelope = MessageEnvelope {
payload: payload.to_vec(),
causal_context: None,
timestamp: now_millis(),
publisher_id: publisher_id.as_str().to_owned(),
idempotency_key: None,
};
let publish_result = {
let mut channel = durable
.lock()
.map_err(|error| LiminalError::PublishFailed {
message: format!("durable channel state unavailable: {error}"),
})?;
block_on(channel.publish(&envelope))
};
publish_result
.map_err(|error| LiminalError::PublishFailed {
message: format!(
"durable publish bridge for channel '{}' failed: {error}",
self.config.name
),
})?
.map_err(|error| LiminalError::PublishFailed {
message: format!(
"durable publish to channel '{}' failed: {error}",
self.config.name
),
})?;
Ok(())
}
pub fn current_schema_id(&self) -> Result<SchemaId, LiminalError> {
self.core()?.schema_id()
}
pub fn evolve_schema_add_field(
&self,
name: impl Into<String>,
field_schema: Value,
default: Value,
) -> Result<SchemaId, SchemaValidationError> {
let core = self
.core()
.map_err(|error| SchemaValidationError::InvalidSchema {
message: error.to_string(),
})?;
core.evolve(name.into(), field_schema, default)
}
pub fn subscribe(&self) -> Result<SubscriptionHandle, LiminalError> {
self.subscribe_inner(None)
}
pub fn subscribe_filtered<F>(&self, predicate: F) -> Result<SubscriptionHandle, LiminalError>
where
F: Fn(&Envelope) -> bool + Send + Sync + 'static,
{
self.subscribe_inner(Some(predicate_from(predicate)))
}
fn subscribe_inner(
&self,
predicate: Option<SubscriptionPredicate>,
) -> Result<SubscriptionHandle, LiminalError> {
let core = self.core()?;
let (handle, registration) = SubscriptionHandle::spawn(core.scheduler(), predicate)?;
let pid = registration.pid();
core.subscribe(registration)?;
if let Some(observer) = self.actor.observer() {
observer.on_subscribe(&self.config.name, pid);
}
Ok(handle)
}
pub fn unsubscribe(&self, subscription: &SubscriptionHandle) -> Result<(), LiminalError> {
let pid = subscription.pid();
self.core()?.unsubscribe(pid)?;
if let Some(observer) = self.actor.observer() {
observer.on_unsubscribe(&self.config.name, pid);
}
Ok(())
}
pub fn flush(&self) -> Result<(), LiminalError> {
drop(self.core()?);
let Some(durable) = self.durable.as_ref() else {
return Ok(());
};
let flush_result = {
let channel = durable
.lock()
.map_err(|error| LiminalError::PublishFailed {
message: format!("durable channel state unavailable: {error}"),
})?;
block_on(channel.flush_store())
};
flush_result
.map_err(|error| LiminalError::PublishFailed {
message: format!(
"durable flush bridge for channel '{}' failed: {error}",
self.config.name
),
})?
.map_err(|error| LiminalError::PublishFailed {
message: format!(
"durable flush for channel '{}' failed: {error}",
self.config.name
),
})?;
Ok(())
}
pub fn subscriber_count(&self) -> Result<usize, LiminalError> {
Ok(self.core()?.list_subscribers()?.len())
}
pub fn close(&self) -> Result<(), LiminalError> {
self.core()?.close()
}
fn core(&self) -> Result<Arc<ChannelActorCore>, LiminalError> {
self.actor.core(&self.config.schema)
}
#[cfg(test)]
pub(crate) fn actor_pid(&self) -> Result<u64, LiminalError> {
let core = self.core()?;
core.current_pid()?
.ok_or_else(|| LiminalError::DeliveryFailed {
message: "channel actor has no live pid".to_owned(),
})
}
#[cfg(test)]
pub(crate) fn scheduler(&self) -> Result<Arc<beamr::scheduler::Scheduler>, LiminalError> {
Ok(Arc::clone(self.core()?.scheduler()))
}
}
fn now_millis() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |duration| {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
})
}