use std::collections::HashMap;
use std::sync::Mutex;
use crate::channel::supervisor::ChannelSupervisor;
use crate::channel::types::{ChannelConfig, ChannelHandle};
use crate::error::LiminalError;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ChannelSummary {
pub name: String,
pub subscriber_count: usize,
}
#[derive(Debug)]
pub struct ChannelRegistry {
supervisor: ChannelSupervisor,
channels: Mutex<HashMap<String, ChannelHandle>>,
}
impl ChannelRegistry {
pub fn new() -> Result<Self, LiminalError> {
Ok(Self {
supervisor: ChannelSupervisor::new()?,
channels: Mutex::new(HashMap::new()),
})
}
#[must_use]
pub fn with_supervisor(supervisor: ChannelSupervisor) -> Self {
Self {
supervisor,
channels: Mutex::new(HashMap::new()),
}
}
pub fn create(&self, config: ChannelConfig) -> Result<ChannelHandle, LiminalError> {
let mut channels = self.lock()?;
if channels.contains_key(&config.name) {
return Err(LiminalError::PublishFailed {
message: format!("channel '{}' already exists", config.name),
});
}
let name = config.name.clone();
let handle = ChannelHandle::with_supervisor(config, self.supervisor.clone());
channels.insert(name, handle.clone());
drop(channels);
Ok(handle)
}
pub fn lookup(&self, name: &str) -> Result<Option<ChannelHandle>, LiminalError> {
Ok(self.lock()?.get(name).cloned())
}
pub fn list(&self) -> Result<Vec<ChannelSummary>, LiminalError> {
let snapshot: Vec<(String, ChannelHandle)> = {
let channels = self.lock()?;
channels
.iter()
.map(|(name, handle)| (name.clone(), handle.clone()))
.collect()
};
let mut summaries = snapshot
.into_iter()
.map(|(name, handle)| ChannelSummary {
subscriber_count: handle.subscriber_count().unwrap_or(0),
name,
})
.collect::<Vec<_>>();
summaries.sort_by(|left, right| left.name.cmp(&right.name));
Ok(summaries)
}
pub fn close(&self, name: &str) -> Result<bool, LiminalError> {
let handle = self.lock()?.remove(name);
match handle {
Some(handle) => {
handle.close()?;
Ok(true)
}
None => Ok(false),
}
}
pub fn shutdown(&self) {
self.supervisor.shutdown();
}
fn lock(
&self,
) -> Result<std::sync::MutexGuard<'_, HashMap<String, ChannelHandle>>, LiminalError> {
self.channels
.lock()
.map_err(|error| LiminalError::PublishFailed {
message: format!("channel registry lock poisoned: {error}"),
})
}
}