Skip to main content

nmp_threading/
runtime.rs

1//! Host wiring for the `nmp.threading.graph.*` read model family.
2
3use std::sync::Arc;
4
5use nmp_core::substrate::{
6    ObservedProjection, ObservedProjectionRegistrar, SnapshotProjectionRegistrar,
7};
8use nmp_core::{ObservedProjectionId, ObservedProjectionSink};
9use nmp_ownership::FrameworkProjectionKey;
10use nmp_planner::InterestShape;
11
12use crate::{ModulePolicy, ThreadingProjection};
13
14/// Projection-family owner claim used by every dynamic threading graph key.
15pub const THREADING_GRAPH_PROJECTION_FAMILY_CLAIM: &str = "projection.nmp.threading.graph";
16/// Dynamic projection key prefix. Session keys append a validated suffix.
17pub const THREADING_GRAPH_PROJECTION_KEY_PREFIX: &str = "nmp.threading.graph.";
18/// Stable schema id carried in typed-projection envelopes.
19pub const THREADING_GRAPH_SCHEMA_ID: &str = "nmp.threading.graph";
20/// Maximum accepted byte length for a caller-supplied session suffix.
21pub const THREADING_GRAPH_SESSION_ID_MAX_LEN: usize = 128;
22
23/// Account routing scope for the observed projection.
24#[derive(Clone, Copy, Debug, Eq, PartialEq)]
25pub enum ThreadingScope {
26    ActiveAccount,
27    Global,
28}
29
30impl ThreadingScope {
31    const fn code(self) -> u32 {
32        match self {
33            Self::ActiveAccount => 0,
34            Self::Global => 1,
35        }
36    }
37}
38
39/// Parameters for one open threading graph read model.
40#[derive(Clone, Debug, Eq, PartialEq)]
41pub struct ThreadingReadModelParams {
42    /// Caller-stable id used to derive `nmp.threading.graph.<session_id>`.
43    pub session_id: String,
44    /// Event scope to observe and replay.
45    pub shape: InterestShape,
46    /// Routing scope for the observed projection.
47    pub scope: ThreadingScope,
48    /// Maximum cached events replayed before live activation.
49    pub replay_limit: usize,
50    /// Grouping policy for the emitted block layout.
51    pub policy: ModulePolicy,
52}
53
54impl ThreadingReadModelParams {
55    #[must_use]
56    pub fn global(session_id: impl Into<String>, shape: InterestShape) -> Self {
57        Self {
58            session_id: session_id.into(),
59            shape,
60            scope: ThreadingScope::Global,
61            replay_limit: 512,
62            policy: ModulePolicy::default(),
63        }
64    }
65}
66
67/// Handle returned by [`open_threading_read_model`].
68#[derive(Clone, Debug, Eq, PartialEq)]
69pub struct ThreadingReadModelHandle {
70    projection_key: String,
71    observer_id: ObservedProjectionId,
72}
73
74impl ThreadingReadModelHandle {
75    #[must_use]
76    pub fn projection_key(&self) -> &str {
77        &self.projection_key
78    }
79
80    #[must_use]
81    pub fn observer_id(&self) -> ObservedProjectionId {
82        self.observer_id
83    }
84}
85
86/// Build the framework-owned projection key for `session_id`.
87///
88/// The suffix is intentionally conservative so a session id cannot smuggle a
89/// second namespace segment with whitespace or control characters, and so the
90/// snapshot registry cannot be fed unbounded caller-owned keys.
91pub fn threading_projection_key(session_id: &str) -> Option<String> {
92    let suffix = session_id.trim();
93    if suffix.is_empty()
94        || suffix.len() > THREADING_GRAPH_SESSION_ID_MAX_LEN
95        || !suffix
96            .bytes()
97            .all(|b| b.is_ascii_alphanumeric() || matches!(b, b'.' | b'_' | b'-'))
98    {
99        return None;
100    }
101    Some(format!("{THREADING_GRAPH_PROJECTION_KEY_PREFIX}{suffix}"))
102}
103
104/// Open a reactive threading graph projection against a caller-supplied scope.
105pub fn open_threading_read_model(
106    app: &(impl ObservedProjectionRegistrar + SnapshotProjectionRegistrar),
107    params: ThreadingReadModelParams,
108) -> Option<ThreadingReadModelHandle> {
109    let projection_key = threading_projection_key(&params.session_id)?;
110    let registration_key =
111        FrameworkProjectionKey::declared(projection_key.clone(), "projection.nmp.threading.graph")
112            .ok()?;
113
114    let projection = Arc::new(ThreadingProjection::etag(params.policy));
115    let observer_id = app.open_observed_projection(ObservedProjection::from_shape(
116        Arc::clone(&projection) as Arc<dyn ObservedProjectionSink>,
117        projection_key.clone(),
118        params.scope.code(),
119        params.shape,
120        params.replay_limit,
121    ));
122    if observer_id.0 == 0 {
123        return None;
124    }
125
126    let projection_for_typed = Arc::clone(&projection);
127    let projection_key_for_typed = projection_key.clone();
128    app.register_typed_snapshot_projection(registration_key, move || {
129        Some(projection_for_typed.typed_projection(&projection_key_for_typed))
130    });
131
132    Some(ThreadingReadModelHandle {
133        projection_key,
134        observer_id,
135    })
136}
137
138/// Close a threading read model opened by [`open_threading_read_model`].
139pub fn close_threading_read_model(
140    app: &(impl ObservedProjectionRegistrar + SnapshotProjectionRegistrar),
141    handle: ThreadingReadModelHandle,
142) {
143    app.close_observed_projection(handle.observer_id);
144    app.remove_snapshot_projection(&handle.projection_key);
145}