use std::sync::{Arc, RwLock, Weak};
use meerkat_core::handles::{
DslTransitionError, SessionContextAdvancedObserver, SessionContextHandle,
};
use super::HandleDslAuthority;
use crate::meerkat_machine::dsl as mm_dsl;
pub struct RuntimeSessionContextHandle {
dsl: Arc<HandleDslAuthority>,
observer: RwLock<Option<Weak<dyn SessionContextAdvancedObserver>>>,
}
impl std::fmt::Debug for RuntimeSessionContextHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let observer_tag = self
.observer
.read()
.ok()
.as_deref()
.and_then(|o| o.as_ref().map(|_| "<observer>"));
f.debug_struct("RuntimeSessionContextHandle")
.field("dsl", &self.dsl)
.field("observer", &observer_tag)
.finish()
}
}
impl RuntimeSessionContextHandle {
pub fn new(dsl: Arc<HandleDslAuthority>) -> Self {
Self {
dsl,
observer: RwLock::new(None),
}
}
pub fn ephemeral() -> Self {
Self::new(Arc::new(HandleDslAuthority::ephemeral()))
}
}
impl SessionContextHandle for RuntimeSessionContextHandle {
fn context_advanced(&self, updated_at_ms: u64) -> Result<bool, DslTransitionError> {
let sampled = self.dsl.apply_input_with_effects_and_sample(
mm_dsl::MeerkatMachineInput::AdvanceSessionContext { updated_at_ms },
"SessionContextHandle::context_advanced",
|effects| {
let observer_opt = self
.observer
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.as_ref()
.and_then(Weak::upgrade);
let emissions: Vec<u64> = effects
.iter()
.filter_map(|effect| match effect {
mm_dsl::MeerkatMachineEffect::SessionContextAdvanced {
updated_at_ms: m,
} => Some(*m),
_ => None,
})
.collect();
(observer_opt, emissions)
},
);
let (observer_opt, emissions) = match sampled {
Ok(pair) => pair,
Err(err) if err.is_guard_rejected() => return Ok(false),
Err(err) => return Err(err),
};
if let Some(observer) = observer_opt {
for emitted in emissions {
observer.on_session_context_advanced(emitted);
}
}
Ok(true)
}
fn current_watermark_ms(&self) -> u64 {
self.dsl.snapshot_state().last_session_context_updated_at_ms
}
fn install_observer(&self, observer: Arc<dyn SessionContextAdvancedObserver>) {
*self
.observer
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Arc::downgrade(&observer));
}
fn install_observer_with_baseline(
&self,
observer: Arc<dyn SessionContextAdvancedObserver>,
) -> u64 {
self.dsl.with_state_lock(|state| {
*self
.observer
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) =
Some(Arc::downgrade(&observer));
state.last_session_context_updated_at_ms
})
}
}