use std::sync::OnceLock;
use tokio::sync::mpsc;
use crate::session_manager::{SavedSession, SessionManager};
use crate::utils::spawn_supervised;
#[derive(Debug)]
pub enum PersistRequest {
Checkpoint(SavedSession),
SessionSnapshot(SavedSession),
ClearCheckpoint,
Shutdown,
}
#[derive(Debug, Clone)]
pub struct PersistActorHandle {
tx: mpsc::UnboundedSender<PersistRequest>,
}
impl PersistActorHandle {
pub fn try_send(&self, request: PersistRequest) {
let _ = self.tx.send(request);
}
}
static ACTOR_TX: OnceLock<PersistActorHandle> = OnceLock::new();
pub fn init_actor(handle: PersistActorHandle) {
let _ = ACTOR_TX.set(handle);
}
pub fn persist(request: PersistRequest) {
if let Some(handle) = ACTOR_TX.get() {
handle.try_send(request);
}
}
pub fn spawn_persistence_actor(manager: SessionManager) -> PersistActorHandle {
let (tx, mut rx) = mpsc::unbounded_channel::<PersistRequest>();
let handle = PersistActorHandle { tx };
spawn_supervised(
"persistence-actor",
std::panic::Location::caller(),
async move {
let mut latest_checkpoint: Option<SavedSession> = None;
let mut latest_session: Option<SavedSession> = None;
let mut should_clear: bool = false;
loop {
while let Ok(req) = rx.try_recv() {
match req {
PersistRequest::Checkpoint(session) => {
latest_checkpoint = Some(session);
}
PersistRequest::SessionSnapshot(session) => {
latest_session = Some(session);
}
PersistRequest::ClearCheckpoint => {
should_clear = true;
}
PersistRequest::Shutdown => {
flush_inner(
&manager,
latest_checkpoint.as_ref(),
latest_session.as_ref(),
should_clear,
);
return;
}
}
}
if should_clear {
let _ = manager.clear_checkpoint();
should_clear = false;
}
if let Some(ref session) = latest_checkpoint.take() {
let _ = manager.save_checkpoint(session);
}
if let Some(ref session) = latest_session.take() {
let _ = manager.save_session(session);
}
match rx.recv().await {
Some(PersistRequest::Checkpoint(session)) => {
latest_checkpoint = Some(session);
}
Some(PersistRequest::SessionSnapshot(session)) => {
latest_session = Some(session);
}
Some(PersistRequest::ClearCheckpoint) => {
should_clear = true;
}
Some(PersistRequest::Shutdown) => {
flush_inner(
&manager,
latest_checkpoint.as_ref(),
latest_session.as_ref(),
should_clear,
);
return;
}
None => {
flush_inner(
&manager,
latest_checkpoint.as_ref(),
latest_session.as_ref(),
should_clear,
);
return;
}
}
}
},
);
handle
}
fn flush_inner(
manager: &SessionManager,
checkpoint: Option<&SavedSession>,
session: Option<&SavedSession>,
should_clear: bool,
) {
if should_clear {
let _ = manager.clear_checkpoint();
}
if let Some(s) = checkpoint {
let _ = manager.save_checkpoint(s);
}
if let Some(s) = session {
let _ = manager.save_session(s);
}
}