1use std::sync::Arc;
4
5use nmp_core::substrate::{
6 ObservedProjection, ObservedProjectionRegistrar, SnapshotProjectionRegistrar,
7};
8use nmp_core::{ObservedProjectionId, ObservedProjectionSink};
9use nmp_ownership::FrameworkProjectionKey;
10use nmp_planner::InterestShape;
11
12use crate::{ModulePolicy, ThreadingProjection};
13
14pub const THREADING_GRAPH_PROJECTION_FAMILY_CLAIM: &str = "projection.nmp.threading.graph";
16pub const THREADING_GRAPH_PROJECTION_KEY_PREFIX: &str = "nmp.threading.graph.";
18pub const THREADING_GRAPH_SCHEMA_ID: &str = "nmp.threading.graph";
20pub const THREADING_GRAPH_SESSION_ID_MAX_LEN: usize = 128;
22
23#[derive(Clone, Copy, Debug, Eq, PartialEq)]
25pub enum ThreadingScope {
26 ActiveAccount,
27 Global,
28}
29
30impl ThreadingScope {
31 const fn code(self) -> u32 {
32 match self {
33 Self::ActiveAccount => 0,
34 Self::Global => 1,
35 }
36 }
37}
38
39#[derive(Clone, Debug, Eq, PartialEq)]
41pub struct ThreadingReadModelParams {
42 pub session_id: String,
44 pub shape: InterestShape,
46 pub scope: ThreadingScope,
48 pub replay_limit: usize,
50 pub policy: ModulePolicy,
52}
53
54impl ThreadingReadModelParams {
55 #[must_use]
56 pub fn global(session_id: impl Into<String>, shape: InterestShape) -> Self {
57 Self {
58 session_id: session_id.into(),
59 shape,
60 scope: ThreadingScope::Global,
61 replay_limit: 512,
62 policy: ModulePolicy::default(),
63 }
64 }
65}
66
67#[derive(Clone, Debug, Eq, PartialEq)]
69pub struct ThreadingReadModelHandle {
70 projection_key: String,
71 observer_id: ObservedProjectionId,
72}
73
74impl ThreadingReadModelHandle {
75 #[must_use]
76 pub fn projection_key(&self) -> &str {
77 &self.projection_key
78 }
79
80 #[must_use]
81 pub fn observer_id(&self) -> ObservedProjectionId {
82 self.observer_id
83 }
84}
85
86pub fn threading_projection_key(session_id: &str) -> Option<String> {
92 let suffix = session_id.trim();
93 if suffix.is_empty()
94 || suffix.len() > THREADING_GRAPH_SESSION_ID_MAX_LEN
95 || !suffix
96 .bytes()
97 .all(|b| b.is_ascii_alphanumeric() || matches!(b, b'.' | b'_' | b'-'))
98 {
99 return None;
100 }
101 Some(format!("{THREADING_GRAPH_PROJECTION_KEY_PREFIX}{suffix}"))
102}
103
104pub fn open_threading_read_model(
106 app: &(impl ObservedProjectionRegistrar + SnapshotProjectionRegistrar),
107 params: ThreadingReadModelParams,
108) -> Option<ThreadingReadModelHandle> {
109 let projection_key = threading_projection_key(¶ms.session_id)?;
110 let registration_key =
111 FrameworkProjectionKey::declared(projection_key.clone(), "projection.nmp.threading.graph")
112 .ok()?;
113
114 let projection = Arc::new(ThreadingProjection::etag(params.policy));
115 let observer_id = app.open_observed_projection(ObservedProjection::from_shape(
116 Arc::clone(&projection) as Arc<dyn ObservedProjectionSink>,
117 projection_key.clone(),
118 params.scope.code(),
119 params.shape,
120 params.replay_limit,
121 ));
122 if observer_id.0 == 0 {
123 return None;
124 }
125
126 let projection_for_typed = Arc::clone(&projection);
127 let projection_key_for_typed = projection_key.clone();
128 app.register_typed_snapshot_projection(registration_key, move || {
129 Some(projection_for_typed.typed_projection(&projection_key_for_typed))
130 });
131
132 Some(ThreadingReadModelHandle {
133 projection_key,
134 observer_id,
135 })
136}
137
138pub fn close_threading_read_model(
140 app: &(impl ObservedProjectionRegistrar + SnapshotProjectionRegistrar),
141 handle: ThreadingReadModelHandle,
142) {
143 app.close_observed_projection(handle.observer_id);
144 app.remove_snapshot_projection(&handle.projection_key);
145}