use std::sync::{Mutex, mpsc};
use std::time::Instant;
use super::sync::lock;
use crate::conversation::types::{ParticipantHealth, ParticipantPid, ParticipantStatus};
use crate::error::LiminalError;
#[derive(Debug, Default)]
pub(super) struct ExitNotifierRegistry {
notifiers: Mutex<Vec<(ParticipantPid, mpsc::SyncSender<Instant>)>>,
}
impl ExitNotifierRegistry {
pub(super) fn register(
&self,
participant: ParticipantPid,
notifier: mpsc::SyncSender<Instant>,
participants: &[ParticipantStatus],
) -> Result<(), LiminalError> {
let already_exited_at = participants.iter().find_map(|status| {
(status.participant == participant && status.health == ParticipantHealth::Dead)
.then(|| status.exited_at.unwrap_or_else(Instant::now))
});
if let Some(exited_at) = already_exited_at {
let _ = notifier.try_send(exited_at);
return Ok(());
}
lock(&self.notifiers, "actor exit notifiers")?.push((participant, notifier));
Ok(())
}
pub(super) fn signal(
&self,
participant: ParticipantPid,
observed_at: Instant,
) -> Result<(), LiminalError> {
let matched = {
let mut notifiers = lock(&self.notifiers, "actor exit notifiers")?;
let mut matched = Vec::new();
let mut retained = Vec::with_capacity(notifiers.len());
for (registered, notifier) in notifiers.drain(..) {
if registered == participant {
matched.push(notifier);
} else {
retained.push((registered, notifier));
}
}
*notifiers = retained;
matched
};
for notifier in matched {
let _ = notifier.try_send(observed_at);
}
Ok(())
}
}