Skip to main content

nmp_threading/
projection.rs

1//! Reactive threading graph projection.
2//!
3//! `ThreadingProjection` is the read-model half of the threading mechanism:
4//! callers open an observed projection for a concrete event scope, this sink
5//! folds accepted events through a [`ParentResolver`], and the snapshot side
6//! emits typed edge rows plus the grouped block layout.
7
8use std::sync::Mutex;
9
10use nmp_core::substrate::{BoundedMessageMap, KernelEvent, MAX_PROJECTION_MESSAGES};
11use nmp_core::{ObservedProjectionSink, TypedProjectionData};
12use serde::{Deserialize, Serialize};
13
14use crate::runtime::THREADING_GRAPH_SCHEMA_ID;
15use crate::wire::{
16    encode_threading_snapshot, THREADING_GRAPH_FILE_IDENTIFIER, THREADING_GRAPH_SCHEMA_VERSION,
17};
18use crate::{
19    EtagThreadResolver, Grouper, ModulePolicy, ParentResolver, ThreadPointer, TimelineBlock,
20};
21
22/// Per-event threading facts keyed by event id.
23#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
24pub struct ThreadEdge {
25    /// Event id this row describes.
26    pub event_id: String,
27    /// Author pubkey copied from the event.
28    pub author_pubkey: String,
29    /// Raw event kind copied from the event. The projection does not branch on it.
30    pub kind: u32,
31    /// Event creation time as Unix seconds.
32    pub created_at: u64,
33    /// Direct parent resolved from `e` tag grammar.
34    #[serde(default, skip_serializing_if = "Option::is_none")]
35    pub parent: Option<ThreadPointer>,
36    /// Thread root resolved from `e` tag grammar.
37    #[serde(default, skip_serializing_if = "Option::is_none")]
38    pub root: Option<ThreadPointer>,
39    /// Best-effort parent-author hint, usually the first accompanying `p` tag.
40    #[serde(default, skip_serializing_if = "Option::is_none")]
41    pub parent_author_pubkey: Option<String>,
42}
43
44/// Typed snapshot emitted by the `nmp.threading.graph.*` projection family.
45#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
46pub struct ThreadingSnapshot {
47    /// Stable edge rows ordered by event id.
48    pub edges: Vec<ThreadEdge>,
49    /// Display-order grouping blocks, newest block first.
50    pub blocks: Vec<TimelineBlock>,
51    /// Ancestor event ids the local scope references but has not seen.
52    #[serde(default, skip_serializing_if = "Vec::is_empty")]
53    pub pending_ancestor_ids: Vec<String>,
54}
55
56impl ThreadingSnapshot {
57    #[must_use]
58    pub fn empty() -> Self {
59        Self::default()
60    }
61}
62
63/// Kind-blind observed projection over a scoped event stream.
64pub struct ThreadingProjection<R: ParentResolver = EtagThreadResolver> {
65    state: Mutex<ThreadingState<R>>,
66}
67
68struct ThreadingState<R: ParentResolver> {
69    resolver: R,
70    grouper: Grouper<R>,
71    edges: BoundedMessageMap<String, ThreadEdge>,
72}
73
74impl ThreadingProjection<EtagThreadResolver> {
75    /// Construct the default e-tag threading projection.
76    #[must_use]
77    pub fn etag(policy: ModulePolicy) -> Self {
78        Self::new(EtagThreadResolver, policy, MAX_PROJECTION_MESSAGES)
79    }
80}
81
82impl<R: ParentResolver + Clone> ThreadingProjection<R> {
83    /// Construct a projection using a caller-supplied resolver.
84    #[must_use]
85    pub fn new(resolver: R, policy: ModulePolicy, capacity: usize) -> Self {
86        let grouper = Grouper::new(resolver.clone(), policy);
87        Self {
88            state: Mutex::new(ThreadingState {
89                resolver,
90                grouper,
91                edges: BoundedMessageMap::new(capacity),
92            }),
93        }
94    }
95
96    /// Return a typed snapshot. D6: poisoned state degrades to empty.
97    #[must_use]
98    pub fn snapshot(&self) -> ThreadingSnapshot {
99        self.state
100            .lock()
101            .map(|state| state.snapshot())
102            .unwrap_or_else(|_| ThreadingSnapshot::empty())
103    }
104
105    /// Build typed projection data for `projection_key`.
106    #[must_use]
107    pub fn typed_projection(&self, projection_key: &str) -> TypedProjectionData {
108        let snapshot = self.snapshot();
109        TypedProjectionData {
110            key: projection_key.to_string(),
111            schema_id: THREADING_GRAPH_SCHEMA_ID.to_string(),
112            schema_version: THREADING_GRAPH_SCHEMA_VERSION,
113            file_identifier: String::from_utf8_lossy(THREADING_GRAPH_FILE_IDENTIFIER).into_owned(),
114            payload: encode_threading_snapshot(&snapshot),
115            ..Default::default()
116        }
117    }
118}
119
120impl<R: ParentResolver + Clone> ObservedProjectionSink for ThreadingProjection<R> {
121    fn on_kernel_event(&self, event: &KernelEvent) {
122        if let Ok(mut state) = self.state.lock() {
123            state.ingest(event);
124        }
125    }
126}
127
128impl<R: ParentResolver + Clone> ThreadingState<R> {
129    fn ingest(&mut self, event: &KernelEvent) {
130        let known = self.edges.contains_key(event.id.as_str());
131        if !known {
132            self.evict_oldest_if_full();
133        }
134        let edge = ThreadEdge {
135            event_id: event.id.clone(),
136            author_pubkey: event.author.clone(),
137            kind: event.kind,
138            created_at: event.created_at,
139            parent: self.resolver.parent(event),
140            root: self.resolver.root(event),
141            parent_author_pubkey: self.resolver.parent_author(event),
142        };
143        if known {
144            let _ = self.grouper.on_replace(&event.id, event);
145        } else {
146            let _ = self.grouper.on_insert(event);
147        }
148        self.edges.insert(edge.event_id.clone(), edge);
149    }
150
151    fn evict_oldest_if_full(&mut self) {
152        if self.edges.len() < self.edges.capacity() {
153            return;
154        }
155        let Some((oldest_id, _)) = self.edges.first() else {
156            return;
157        };
158        let oldest_id = oldest_id.clone();
159        self.edges.remove(oldest_id.as_str());
160        let _ = self.grouper.on_remove(&oldest_id);
161    }
162
163    fn snapshot(&self) -> ThreadingSnapshot {
164        let mut edges: Vec<ThreadEdge> = self.edges.values().cloned().collect();
165        edges.sort_by(|a, b| a.event_id.cmp(&b.event_id));
166        ThreadingSnapshot {
167            edges,
168            blocks: self.grouper.blocks().to_vec(),
169            pending_ancestor_ids: self
170                .grouper
171                .pending_ancestor_ids()
172                .iter()
173                .cloned()
174                .collect(),
175        }
176    }
177}