use std::sync::Arc;
use nmp_core::substrate::{
ObservedProjection, ObservedProjectionRegistrar, SnapshotProjectionRegistrar,
};
use nmp_core::{ObservedProjectionId, ObservedProjectionSink};
use nmp_ownership::FrameworkProjectionKey;
use nmp_planner::InterestShape;
use crate::{ModulePolicy, ThreadingProjection};
pub const THREADING_GRAPH_PROJECTION_FAMILY_CLAIM: &str = "projection.nmp.threading.graph";
pub const THREADING_GRAPH_PROJECTION_KEY_PREFIX: &str = "nmp.threading.graph.";
pub const THREADING_GRAPH_SCHEMA_ID: &str = "nmp.threading.graph";
pub const THREADING_GRAPH_SESSION_ID_MAX_LEN: usize = 128;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ThreadingScope {
ActiveAccount,
Global,
}
impl ThreadingScope {
const fn code(self) -> u32 {
match self {
Self::ActiveAccount => 0,
Self::Global => 1,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ThreadingReadModelParams {
pub session_id: String,
pub shape: InterestShape,
pub scope: ThreadingScope,
pub replay_limit: usize,
pub policy: ModulePolicy,
}
impl ThreadingReadModelParams {
#[must_use]
pub fn global(session_id: impl Into<String>, shape: InterestShape) -> Self {
Self {
session_id: session_id.into(),
shape,
scope: ThreadingScope::Global,
replay_limit: 512,
policy: ModulePolicy::default(),
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ThreadingReadModelHandle {
projection_key: String,
observer_id: ObservedProjectionId,
}
impl ThreadingReadModelHandle {
#[must_use]
pub fn projection_key(&self) -> &str {
&self.projection_key
}
#[must_use]
pub fn observer_id(&self) -> ObservedProjectionId {
self.observer_id
}
}
pub fn threading_projection_key(session_id: &str) -> Option<String> {
let suffix = session_id.trim();
if suffix.is_empty()
|| suffix.len() > THREADING_GRAPH_SESSION_ID_MAX_LEN
|| !suffix
.bytes()
.all(|b| b.is_ascii_alphanumeric() || matches!(b, b'.' | b'_' | b'-'))
{
return None;
}
Some(format!("{THREADING_GRAPH_PROJECTION_KEY_PREFIX}{suffix}"))
}
pub fn open_threading_read_model(
app: &(impl ObservedProjectionRegistrar + SnapshotProjectionRegistrar),
params: ThreadingReadModelParams,
) -> Option<ThreadingReadModelHandle> {
let projection_key = threading_projection_key(¶ms.session_id)?;
let registration_key =
FrameworkProjectionKey::declared(projection_key.clone(), "projection.nmp.threading.graph")
.ok()?;
let projection = Arc::new(ThreadingProjection::etag(params.policy));
let observer_id = app.open_observed_projection(ObservedProjection::from_shape(
Arc::clone(&projection) as Arc<dyn ObservedProjectionSink>,
projection_key.clone(),
params.scope.code(),
params.shape,
params.replay_limit,
));
if observer_id.0 == 0 {
return None;
}
let projection_for_typed = Arc::clone(&projection);
let projection_key_for_typed = projection_key.clone();
app.register_typed_snapshot_projection(registration_key, move || {
Some(projection_for_typed.typed_projection(&projection_key_for_typed))
});
Some(ThreadingReadModelHandle {
projection_key,
observer_id,
})
}
pub fn close_threading_read_model(
app: &(impl ObservedProjectionRegistrar + SnapshotProjectionRegistrar),
handle: ThreadingReadModelHandle,
) {
app.close_observed_projection(handle.observer_id);
app.remove_snapshot_projection(&handle.projection_key);
}