hashtree_cli/nostrdb_integration/
snapshot.rs1use std::collections::{BTreeMap, HashMap, HashSet};
2
3use anyhow::{Context, Result};
4use bytes::{Bytes, BytesMut};
5use nostr::{Filter, Kind, PublicKey, Timestamp};
6use nostrdb_social::{Filter as NdbFilter, Transaction};
7
8use super::Ndb;
9
10const BINARY_FORMAT_VERSION: u64 = 2;
11const CHUNK_SIZE: usize = 16 * 1024;
12const MAX_MUTE_FETCH: usize = 100_000;
13
14#[derive(Debug, Clone, Copy, Default)]
15pub struct SnapshotOptions {
16 pub max_nodes: Option<usize>,
17 pub max_edges: Option<usize>,
18 pub max_distance: Option<u32>,
19 pub max_edges_per_node: Option<usize>,
20}
21
22#[derive(Debug, Clone)]
23struct SnapshotData {
24 used_order: Vec<[u8; 32]>,
25 follow_owners: Vec<[u8; 32]>,
26 mute_owners: Vec<[u8; 32]>,
27 follow_targets: HashMap<[u8; 32], Vec<[u8; 32]>>,
28 mute_targets: HashMap<[u8; 32], Vec<[u8; 32]>>,
29 follow_created_at: HashMap<[u8; 32], u64>,
30 mute_created_at: HashMap<[u8; 32], u64>,
31}
32
33pub fn build_snapshot_chunks(
34 ndb: &Ndb,
35 root: &[u8; 32],
36 options: &SnapshotOptions,
37) -> Result<Vec<Bytes>> {
38 let data = build_snapshot_data(ndb, root, options)?;
39 Ok(encode_snapshot_chunks(&data))
40}
41
42fn build_snapshot_data(
43 ndb: &Ndb,
44 root: &[u8; 32],
45 options: &SnapshotOptions,
46) -> Result<SnapshotData> {
47 let txn = Transaction::new(ndb).context("create nostrdb transaction")?;
48
49 let users_by_distance = compute_users_by_distance(&txn, ndb, root, options.max_distance);
50
51 let mut follow_cache: HashMap<[u8; 32], Vec<[u8; 32]>> = HashMap::new();
52 let mut mute_cache: HashMap<[u8; 32], Vec<[u8; 32]>> = HashMap::new();
53
54 let mut used_nodes: HashSet<[u8; 32]> = HashSet::new();
55 let mut used_order: Vec<[u8; 32]> = Vec::new();
56
57 let mut follow_targets: HashMap<[u8; 32], Vec<[u8; 32]>> = HashMap::new();
58 let mut mute_targets: HashMap<[u8; 32], Vec<[u8; 32]>> = HashMap::new();
59 let mut follow_owners: Vec<[u8; 32]> = Vec::new();
60 let mut mute_owners: Vec<[u8; 32]> = Vec::new();
61 let mut follow_owner_set: HashSet<[u8; 32]> = HashSet::new();
62 let mut mute_owner_set: HashSet<[u8; 32]> = HashSet::new();
63
64 let mut edge_count: usize = 0;
65
66 'edges: for (distance, owners) in users_by_distance {
67 if let Some(max_distance) = options.max_distance {
68 if distance > max_distance {
69 break;
70 }
71 }
72
73 for owner in owners {
74 let mut owner_edge_count: usize = 0;
75 let mut per_node_limit_reached = false;
76
77 let follows = get_followed_cached(&txn, ndb, &mut follow_cache, &owner);
78 for target in follows {
79 if let Some(limit) = options.max_edges_per_node {
80 if owner_edge_count >= limit {
81 per_node_limit_reached = true;
82 break;
83 }
84 }
85
86 if let Some(max_edges) = options.max_edges {
87 if edge_count >= max_edges {
88 break 'edges;
89 }
90 }
91
92 if let Some(max_nodes) = options.max_nodes {
93 let mut new_nodes = 0usize;
94 if !used_nodes.contains(&owner) {
95 new_nodes += 1;
96 }
97 if !used_nodes.contains(&target) {
98 new_nodes += 1;
99 }
100 if used_nodes.len() + new_nodes > max_nodes {
101 break 'edges;
102 }
103 }
104
105 if used_nodes.insert(owner) {
106 used_order.push(owner);
107 }
108 if used_nodes.insert(target) {
109 used_order.push(target);
110 }
111
112 follow_targets.entry(owner).or_default().push(target);
113 if follow_owner_set.insert(owner) {
114 follow_owners.push(owner);
115 }
116
117 edge_count += 1;
118 owner_edge_count += 1;
119 }
120
121 if per_node_limit_reached {
122 continue;
123 }
124
125 let mutes = get_muted_cached(
126 &txn,
127 ndb,
128 &mut mute_cache,
129 &owner,
130 options.max_edges_per_node,
131 );
132 for target in mutes {
133 if let Some(limit) = options.max_edges_per_node {
134 if owner_edge_count >= limit {
135 break;
136 }
137 }
138
139 if let Some(max_edges) = options.max_edges {
140 if edge_count >= max_edges {
141 break 'edges;
142 }
143 }
144
145 if let Some(max_nodes) = options.max_nodes {
146 let mut new_nodes = 0usize;
147 if !used_nodes.contains(&owner) {
148 new_nodes += 1;
149 }
150 if !used_nodes.contains(&target) {
151 new_nodes += 1;
152 }
153 if used_nodes.len() + new_nodes > max_nodes {
154 break 'edges;
155 }
156 }
157
158 if used_nodes.insert(owner) {
159 used_order.push(owner);
160 }
161 if used_nodes.insert(target) {
162 used_order.push(target);
163 }
164
165 mute_targets.entry(owner).or_default().push(target);
166 if mute_owner_set.insert(owner) {
167 mute_owners.push(owner);
168 }
169
170 edge_count += 1;
171 owner_edge_count += 1;
172 }
173 }
174 }
175
176 let mut follow_created_at = HashMap::new();
177 let mut mute_created_at = HashMap::new();
178 for owner in &follow_owners {
179 let ts = latest_created_at(&txn, ndb, owner, Kind::ContactList).unwrap_or(0);
180 follow_created_at.insert(*owner, ts);
181 }
182 for owner in &mute_owners {
183 let ts = latest_created_at(&txn, ndb, owner, Kind::MuteList).unwrap_or(0);
184 mute_created_at.insert(*owner, ts);
185 }
186
187 Ok(SnapshotData {
188 used_order,
189 follow_owners,
190 mute_owners,
191 follow_targets,
192 mute_targets,
193 follow_created_at,
194 mute_created_at,
195 })
196}
197
198fn compute_users_by_distance(
199 txn: &Transaction,
200 ndb: &Ndb,
201 root: &[u8; 32],
202 max_distance: Option<u32>,
203) -> BTreeMap<u32, Vec<[u8; 32]>> {
204 let mut visited: HashSet<[u8; 32]> = HashSet::new();
205 let mut by_distance: BTreeMap<u32, Vec<[u8; 32]>> = BTreeMap::new();
206
207 let mut current: Vec<[u8; 32]> = vec![*root];
208 visited.insert(*root);
209 by_distance.insert(0, current.clone());
210
211 let mut depth: u32 = 0;
212 loop {
213 if let Some(max_distance) = max_distance {
214 if depth >= max_distance {
215 break;
216 }
217 }
218
219 if current.is_empty() {
220 break;
221 }
222
223 let mut next: Vec<[u8; 32]> = Vec::new();
224 for owner in ¤t {
225 let follows = get_followed_full(txn, ndb, owner);
226 for target in follows {
227 if visited.insert(target) {
228 next.push(target);
229 }
230 }
231 }
232
233 depth += 1;
234 if !next.is_empty() {
235 by_distance.insert(depth, next.clone());
236 }
237 current = next;
238 }
239
240 by_distance
241}
242
243fn get_followed_cached(
244 txn: &Transaction,
245 ndb: &Ndb,
246 cache: &mut HashMap<[u8; 32], Vec<[u8; 32]>>,
247 owner: &[u8; 32],
248) -> Vec<[u8; 32]> {
249 if let Some(existing) = cache.get(owner) {
250 return existing.clone();
251 }
252 let follows = get_followed_full(txn, ndb, owner);
253 cache.insert(*owner, follows.clone());
254 follows
255}
256
257fn get_muted_cached(
258 txn: &Transaction,
259 ndb: &Ndb,
260 cache: &mut HashMap<[u8; 32], Vec<[u8; 32]>>,
261 owner: &[u8; 32],
262 max_edges_per_node: Option<usize>,
263) -> Vec<[u8; 32]> {
264 if let Some(existing) = cache.get(owner) {
265 return existing.clone();
266 }
267 let mutes = get_muted_full(txn, ndb, owner, max_edges_per_node);
268 cache.insert(*owner, mutes.clone());
269 mutes
270}
271
272fn get_followed_full(txn: &Transaction, ndb: &Ndb, owner: &[u8; 32]) -> Vec<[u8; 32]> {
273 let count = nostrdb_social::socialgraph::followed_count(txn, ndb, owner);
274 let max = count.min(i32::MAX as usize);
275 nostrdb_social::socialgraph::get_followed(txn, ndb, owner, max)
276}
277
278fn get_muted_full(
279 txn: &Transaction,
280 ndb: &Ndb,
281 owner: &[u8; 32],
282 max_edges_per_node: Option<usize>,
283) -> Vec<[u8; 32]> {
284 if let Some(limit) = max_edges_per_node {
285 let capped = limit.min(i32::MAX as usize);
286 return nostrdb_social::socialgraph::get_muted(txn, ndb, owner, capped);
287 }
288
289 let mut max_out = 1024usize.min(MAX_MUTE_FETCH);
290 loop {
291 let mutes = nostrdb_social::socialgraph::get_muted(txn, ndb, owner, max_out);
292 if mutes.len() < max_out || max_out >= MAX_MUTE_FETCH {
293 return mutes;
294 }
295 max_out = (max_out * 2).min(MAX_MUTE_FETCH);
296 }
297}
298
299fn latest_created_at(txn: &Transaction, ndb: &Ndb, owner: &[u8; 32], kind: Kind) -> Result<u64> {
300 let pubkey = match PublicKey::from_slice(owner) {
301 Ok(pk) => pk,
302 Err(_) => return Ok(0),
303 };
304
305 let mut max_seen = 0u64;
306 let mut since = 0u64;
307 let max_results = 512usize;
308
309 loop {
310 let filter = Filter::new()
311 .author(pubkey)
312 .kind(kind)
313 .since(Timestamp::from_secs(since))
314 .limit(max_results);
315 let filter_json = serde_json::to_string(&filter).context("serialize filter")?;
316 let ndb_filter = NdbFilter::from_json(&filter_json).context("build nostrdb filter")?;
317
318 let results = ndb
319 .query(txn, &[ndb_filter], max_results as i32)
320 .context("query nostrdb")?;
321
322 if results.is_empty() {
323 break;
324 }
325
326 for result in &results {
327 let ts = result.note.created_at();
328 if ts > max_seen {
329 max_seen = ts;
330 }
331 }
332
333 if results.len() < max_results {
334 break;
335 }
336
337 if max_seen >= since {
338 since = max_seen.saturating_add(1);
339 } else {
340 break;
341 }
342 }
343
344 Ok(max_seen)
345}
346
347fn encode_snapshot_chunks(data: &SnapshotData) -> Vec<Bytes> {
348 let mut id_map: HashMap<[u8; 32], u32> = HashMap::new();
349 for (idx, pk) in data.used_order.iter().enumerate() {
350 id_map.insert(*pk, idx as u32);
351 }
352
353 let mut writer = ChunkWriter::new();
354 writer.write_varint(BINARY_FORMAT_VERSION);
355
356 writer.write_varint(data.used_order.len() as u64);
357 for (idx, pk) in data.used_order.iter().enumerate() {
358 writer.write_bytes(pk);
359 writer.write_varint(idx as u64);
360 }
361
362 writer.write_varint(data.follow_owners.len() as u64);
363 for owner in &data.follow_owners {
364 let owner_id = id_map.get(owner).copied().unwrap_or_default();
365 let ts = data.follow_created_at.get(owner).copied().unwrap_or(0);
366 let targets = data.follow_targets.get(owner).cloned().unwrap_or_default();
367
368 writer.write_varint(owner_id as u64);
369 writer.write_varint(ts);
370 writer.write_varint(targets.len() as u64);
371 for target in targets {
372 let target_id = id_map.get(&target).copied().unwrap_or_default();
373 writer.write_varint(target_id as u64);
374 }
375 }
376
377 writer.write_varint(data.mute_owners.len() as u64);
378 for owner in &data.mute_owners {
379 let owner_id = id_map.get(owner).copied().unwrap_or_default();
380 let ts = data.mute_created_at.get(owner).copied().unwrap_or(0);
381 let targets = data.mute_targets.get(owner).cloned().unwrap_or_default();
382
383 writer.write_varint(owner_id as u64);
384 writer.write_varint(ts);
385 writer.write_varint(targets.len() as u64);
386 for target in targets {
387 let target_id = id_map.get(&target).copied().unwrap_or_default();
388 writer.write_varint(target_id as u64);
389 }
390 }
391
392 writer.finish()
393}
394
395struct ChunkWriter {
396 buf: BytesMut,
397 chunks: Vec<Bytes>,
398}
399
400impl ChunkWriter {
401 fn new() -> Self {
402 Self {
403 buf: BytesMut::with_capacity(CHUNK_SIZE),
404 chunks: Vec::new(),
405 }
406 }
407
408 fn write_bytes(&mut self, bytes: &[u8]) {
409 let mut offset = 0;
410 while offset < bytes.len() {
411 let remaining = CHUNK_SIZE - self.buf.len();
412 if remaining == 0 {
413 self.flush();
414 continue;
415 }
416 let to_write = remaining.min(bytes.len() - offset);
417 self.buf
418 .extend_from_slice(&bytes[offset..offset + to_write]);
419 offset += to_write;
420 }
421 }
422
423 fn write_varint(&mut self, mut value: u64) {
424 while value >= 0x80 {
425 let byte = ((value as u8) & 0x7f) | 0x80;
426 self.write_bytes(&[byte]);
427 value >>= 7;
428 }
429 self.write_bytes(&[(value as u8) & 0x7f]);
430 }
431
432 fn flush(&mut self) {
433 if self.buf.is_empty() {
434 return;
435 }
436 let chunk = self.buf.split().freeze();
437 self.chunks.push(chunk);
438 }
439
440 fn finish(mut self) -> Vec<Bytes> {
441 self.flush();
442 self.chunks
443 }
444}