nmp_threading/
projection.rs1use std::sync::Mutex;
9
10use nmp_core::substrate::{BoundedMessageMap, KernelEvent, MAX_PROJECTION_MESSAGES};
11use nmp_core::{ObservedProjectionSink, TypedProjectionData};
12use serde::{Deserialize, Serialize};
13
14use crate::runtime::THREADING_GRAPH_SCHEMA_ID;
15use crate::wire::{
16 encode_threading_snapshot, THREADING_GRAPH_FILE_IDENTIFIER, THREADING_GRAPH_SCHEMA_VERSION,
17};
18use crate::{
19 EtagThreadResolver, Grouper, ModulePolicy, ParentResolver, ThreadPointer, TimelineBlock,
20};
21
22#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
24pub struct ThreadEdge {
25 pub event_id: String,
27 pub author_pubkey: String,
29 pub kind: u32,
31 pub created_at: u64,
33 #[serde(default, skip_serializing_if = "Option::is_none")]
35 pub parent: Option<ThreadPointer>,
36 #[serde(default, skip_serializing_if = "Option::is_none")]
38 pub root: Option<ThreadPointer>,
39 #[serde(default, skip_serializing_if = "Option::is_none")]
41 pub parent_author_pubkey: Option<String>,
42}
43
44#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
46pub struct ThreadingSnapshot {
47 pub edges: Vec<ThreadEdge>,
49 pub blocks: Vec<TimelineBlock>,
51 #[serde(default, skip_serializing_if = "Vec::is_empty")]
53 pub pending_ancestor_ids: Vec<String>,
54}
55
56impl ThreadingSnapshot {
57 #[must_use]
58 pub fn empty() -> Self {
59 Self::default()
60 }
61}
62
63pub struct ThreadingProjection<R: ParentResolver = EtagThreadResolver> {
65 state: Mutex<ThreadingState<R>>,
66}
67
68struct ThreadingState<R: ParentResolver> {
69 resolver: R,
70 grouper: Grouper<R>,
71 edges: BoundedMessageMap<String, ThreadEdge>,
72}
73
74impl ThreadingProjection<EtagThreadResolver> {
75 #[must_use]
77 pub fn etag(policy: ModulePolicy) -> Self {
78 Self::new(EtagThreadResolver, policy, MAX_PROJECTION_MESSAGES)
79 }
80}
81
82impl<R: ParentResolver + Clone> ThreadingProjection<R> {
83 #[must_use]
85 pub fn new(resolver: R, policy: ModulePolicy, capacity: usize) -> Self {
86 let grouper = Grouper::new(resolver.clone(), policy);
87 Self {
88 state: Mutex::new(ThreadingState {
89 resolver,
90 grouper,
91 edges: BoundedMessageMap::new(capacity),
92 }),
93 }
94 }
95
96 #[must_use]
98 pub fn snapshot(&self) -> ThreadingSnapshot {
99 self.state
100 .lock()
101 .map(|state| state.snapshot())
102 .unwrap_or_else(|_| ThreadingSnapshot::empty())
103 }
104
105 #[must_use]
107 pub fn typed_projection(&self, projection_key: &str) -> TypedProjectionData {
108 let snapshot = self.snapshot();
109 TypedProjectionData {
110 key: projection_key.to_string(),
111 schema_id: THREADING_GRAPH_SCHEMA_ID.to_string(),
112 schema_version: THREADING_GRAPH_SCHEMA_VERSION,
113 file_identifier: String::from_utf8_lossy(THREADING_GRAPH_FILE_IDENTIFIER).into_owned(),
114 payload: encode_threading_snapshot(&snapshot),
115 ..Default::default()
116 }
117 }
118}
119
120impl<R: ParentResolver + Clone> ObservedProjectionSink for ThreadingProjection<R> {
121 fn on_kernel_event(&self, event: &KernelEvent) {
122 if let Ok(mut state) = self.state.lock() {
123 state.ingest(event);
124 }
125 }
126}
127
128impl<R: ParentResolver + Clone> ThreadingState<R> {
129 fn ingest(&mut self, event: &KernelEvent) {
130 let known = self.edges.contains_key(event.id.as_str());
131 if !known {
132 self.evict_oldest_if_full();
133 }
134 let edge = ThreadEdge {
135 event_id: event.id.clone(),
136 author_pubkey: event.author.clone(),
137 kind: event.kind,
138 created_at: event.created_at,
139 parent: self.resolver.parent(event),
140 root: self.resolver.root(event),
141 parent_author_pubkey: self.resolver.parent_author(event),
142 };
143 if known {
144 let _ = self.grouper.on_replace(&event.id, event);
145 } else {
146 let _ = self.grouper.on_insert(event);
147 }
148 self.edges.insert(edge.event_id.clone(), edge);
149 }
150
151 fn evict_oldest_if_full(&mut self) {
152 if self.edges.len() < self.edges.capacity() {
153 return;
154 }
155 let Some((oldest_id, _)) = self.edges.first() else {
156 return;
157 };
158 let oldest_id = oldest_id.clone();
159 self.edges.remove(oldest_id.as_str());
160 let _ = self.grouper.on_remove(&oldest_id);
161 }
162
163 fn snapshot(&self) -> ThreadingSnapshot {
164 let mut edges: Vec<ThreadEdge> = self.edges.values().cloned().collect();
165 edges.sort_by(|a, b| a.event_id.cmp(&b.event_id));
166 ThreadingSnapshot {
167 edges,
168 blocks: self.grouper.blocks().to_vec(),
169 pending_ancestor_ids: self
170 .grouper
171 .pending_ancestor_ids()
172 .iter()
173 .cloned()
174 .collect(),
175 }
176 }
177}