use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwap;
use kanal::unbounded;
use crate::command::OnComplete;
type Update<D> = Box<dyn FnOnce(&mut D) + Send>;
type UpdateSender<D> = kanal::Sender<Update<D>>;
type UpdateReceiver<D> = kanal::Receiver<Update<D>>;
type Setter<D, T> = Arc<dyn Fn(&mut D, T) + Send + Sync>;
pub struct DomainWriteHandle<D> {
swap: Arc<ArcSwap<D>>,
rx: UpdateReceiver<D>,
coalesce_interval: Duration,
}
impl<D> DomainWriteHandle<D>
where
D: Clone + Send + 'static,
{
pub async fn run(self) {
loop {
let Ok(first) = self.rx.as_async().recv().await else {
return;
};
let mut batch: Vec<Update<D>> = vec![first];
while let Ok(Some(update)) = self.rx.try_recv() {
batch.push(update);
}
tokio::time::sleep(self.coalesce_interval).await;
while let Ok(Some(update)) = self.rx.try_recv() {
batch.push(update);
}
let current = self.swap.load();
let mut new_data = (**current).clone();
drop(current);
for update in batch {
update(&mut new_data);
}
self.swap.store(Arc::new(new_data));
}
}
}
#[derive(Debug)]
pub struct SharedDomainData<D> {
swap: Arc<ArcSwap<D>>,
write_tx: UpdateSender<D>,
}
impl<D> Clone for SharedDomainData<D>
where
D: Clone,
{
fn clone(&self) -> Self {
Self {
swap: self.swap.clone(),
write_tx: self.write_tx.clone(),
}
}
}
impl<D> SharedDomainData<D>
where
D: Clone + Send + Sync + 'static,
{
pub fn new(initial: D) -> (Self, DomainWriteHandle<D>) {
Self::with_coalesce(initial, Duration::from_micros(500))
}
pub fn with_coalesce(initial: D, coalesce_interval: Duration) -> (Self, DomainWriteHandle<D>) {
let (tx, rx) = unbounded();
let swap = Arc::new(ArcSwap::from_pointee(initial));
(
Self {
swap: swap.clone(),
write_tx: tx,
},
DomainWriteHandle {
swap,
rx,
coalesce_interval,
},
)
}
pub fn read(&self) -> arc_swap::Guard<Arc<D>> {
self.swap.load()
}
pub fn modify(&self, f: impl FnOnce(&mut D) + Send + 'static) {
let _ = self.write_tx.send(Box::new(f));
}
pub fn handler<T>(
&self,
setter: impl Fn(&mut D, T) + Send + Sync + 'static,
) -> DomainHandler<D, T> {
DomainHandler {
write_tx: self.write_tx.clone(),
setter: Arc::new(setter),
}
}
pub fn handler_noop<T>(&self) -> DomainHandler<D, T>
where
T: Send + 'static,
{
self.handler(|_, _| {})
}
pub fn bind<S>(
&self,
services: S,
rt: tokio::runtime::Handle,
) -> crate::executor::DomainExecutor<D, S>
where
S: Clone + Send + 'static,
{
crate::executor::DomainExecutor {
domain: self.clone(),
services,
rt,
error_handler: None,
}
}
pub fn stream(&self, rt: tokio::runtime::Handle) -> crate::stream::StreamExecutor<D> {
crate::stream::StreamExecutor {
domain: self.clone(),
rt,
}
}
}
pub struct DomainHandler<D, T> {
write_tx: UpdateSender<D>,
setter: Setter<D, T>,
}
impl<D, T> Clone for DomainHandler<D, T> {
fn clone(&self) -> Self {
Self {
write_tx: self.write_tx.clone(),
setter: self.setter.clone(),
}
}
}
impl<D, T> OnComplete<T> for DomainHandler<D, T>
where
D: Send + 'static,
T: Send + 'static,
{
fn run(self, value: T) {
let setter = self.setter;
let update: Update<D> = Box::new(move |data: &mut D| {
setter(data, value);
});
let _ = self.write_tx.send(update);
}
}