Skip to main content

hashtree_cli/nostrdb_integration/
snapshot.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2
3use anyhow::{Context, Result};
4use bytes::{Bytes, BytesMut};
5use nostr::{Filter, Kind, PublicKey, Timestamp};
6use nostrdb_social::{Filter as NdbFilter, Transaction};
7
8use super::Ndb;
9
10const BINARY_FORMAT_VERSION: u64 = 2;
11const CHUNK_SIZE: usize = 16 * 1024;
12const MAX_MUTE_FETCH: usize = 100_000;
13
14#[derive(Debug, Clone, Copy, Default)]
15pub struct SnapshotOptions {
16    pub max_nodes: Option<usize>,
17    pub max_edges: Option<usize>,
18    pub max_distance: Option<u32>,
19    pub max_edges_per_node: Option<usize>,
20}
21
22#[derive(Debug, Clone)]
23struct SnapshotData {
24    used_order: Vec<[u8; 32]>,
25    follow_owners: Vec<[u8; 32]>,
26    mute_owners: Vec<[u8; 32]>,
27    follow_targets: HashMap<[u8; 32], Vec<[u8; 32]>>,
28    mute_targets: HashMap<[u8; 32], Vec<[u8; 32]>>,
29    follow_created_at: HashMap<[u8; 32], u64>,
30    mute_created_at: HashMap<[u8; 32], u64>,
31}
32
33pub fn build_snapshot_chunks(
34    ndb: &Ndb,
35    root: &[u8; 32],
36    options: &SnapshotOptions,
37) -> Result<Vec<Bytes>> {
38    let data = build_snapshot_data(ndb, root, options)?;
39    Ok(encode_snapshot_chunks(&data))
40}
41
42fn build_snapshot_data(
43    ndb: &Ndb,
44    root: &[u8; 32],
45    options: &SnapshotOptions,
46) -> Result<SnapshotData> {
47    let txn = Transaction::new(ndb).context("create nostrdb transaction")?;
48
49    let users_by_distance = compute_users_by_distance(&txn, ndb, root, options.max_distance);
50
51    let mut follow_cache: HashMap<[u8; 32], Vec<[u8; 32]>> = HashMap::new();
52    let mut mute_cache: HashMap<[u8; 32], Vec<[u8; 32]>> = HashMap::new();
53
54    let mut used_nodes: HashSet<[u8; 32]> = HashSet::new();
55    let mut used_order: Vec<[u8; 32]> = Vec::new();
56
57    let mut follow_targets: HashMap<[u8; 32], Vec<[u8; 32]>> = HashMap::new();
58    let mut mute_targets: HashMap<[u8; 32], Vec<[u8; 32]>> = HashMap::new();
59    let mut follow_owners: Vec<[u8; 32]> = Vec::new();
60    let mut mute_owners: Vec<[u8; 32]> = Vec::new();
61    let mut follow_owner_set: HashSet<[u8; 32]> = HashSet::new();
62    let mut mute_owner_set: HashSet<[u8; 32]> = HashSet::new();
63
64    let mut edge_count: usize = 0;
65
66    'edges: for (distance, owners) in users_by_distance {
67        if let Some(max_distance) = options.max_distance {
68            if distance > max_distance {
69                break;
70            }
71        }
72
73        for owner in owners {
74            let mut owner_edge_count: usize = 0;
75            let mut per_node_limit_reached = false;
76
77            let follows = get_followed_cached(&txn, ndb, &mut follow_cache, &owner);
78            for target in follows {
79                if let Some(limit) = options.max_edges_per_node {
80                    if owner_edge_count >= limit {
81                        per_node_limit_reached = true;
82                        break;
83                    }
84                }
85
86                if let Some(max_edges) = options.max_edges {
87                    if edge_count >= max_edges {
88                        break 'edges;
89                    }
90                }
91
92                if let Some(max_nodes) = options.max_nodes {
93                    let mut new_nodes = 0usize;
94                    if !used_nodes.contains(&owner) {
95                        new_nodes += 1;
96                    }
97                    if !used_nodes.contains(&target) {
98                        new_nodes += 1;
99                    }
100                    if used_nodes.len() + new_nodes > max_nodes {
101                        break 'edges;
102                    }
103                }
104
105                if used_nodes.insert(owner) {
106                    used_order.push(owner);
107                }
108                if used_nodes.insert(target) {
109                    used_order.push(target);
110                }
111
112                follow_targets.entry(owner).or_default().push(target);
113                if follow_owner_set.insert(owner) {
114                    follow_owners.push(owner);
115                }
116
117                edge_count += 1;
118                owner_edge_count += 1;
119            }
120
121            if per_node_limit_reached {
122                continue;
123            }
124
125            let mutes = get_muted_cached(
126                &txn,
127                ndb,
128                &mut mute_cache,
129                &owner,
130                options.max_edges_per_node,
131            );
132            for target in mutes {
133                if let Some(limit) = options.max_edges_per_node {
134                    if owner_edge_count >= limit {
135                        break;
136                    }
137                }
138
139                if let Some(max_edges) = options.max_edges {
140                    if edge_count >= max_edges {
141                        break 'edges;
142                    }
143                }
144
145                if let Some(max_nodes) = options.max_nodes {
146                    let mut new_nodes = 0usize;
147                    if !used_nodes.contains(&owner) {
148                        new_nodes += 1;
149                    }
150                    if !used_nodes.contains(&target) {
151                        new_nodes += 1;
152                    }
153                    if used_nodes.len() + new_nodes > max_nodes {
154                        break 'edges;
155                    }
156                }
157
158                if used_nodes.insert(owner) {
159                    used_order.push(owner);
160                }
161                if used_nodes.insert(target) {
162                    used_order.push(target);
163                }
164
165                mute_targets.entry(owner).or_default().push(target);
166                if mute_owner_set.insert(owner) {
167                    mute_owners.push(owner);
168                }
169
170                edge_count += 1;
171                owner_edge_count += 1;
172            }
173        }
174    }
175
176    let mut follow_created_at = HashMap::new();
177    let mut mute_created_at = HashMap::new();
178    for owner in &follow_owners {
179        let ts = latest_created_at(&txn, ndb, owner, Kind::ContactList).unwrap_or(0);
180        follow_created_at.insert(*owner, ts);
181    }
182    for owner in &mute_owners {
183        let ts = latest_created_at(&txn, ndb, owner, Kind::MuteList).unwrap_or(0);
184        mute_created_at.insert(*owner, ts);
185    }
186
187    Ok(SnapshotData {
188        used_order,
189        follow_owners,
190        mute_owners,
191        follow_targets,
192        mute_targets,
193        follow_created_at,
194        mute_created_at,
195    })
196}
197
198fn compute_users_by_distance(
199    txn: &Transaction,
200    ndb: &Ndb,
201    root: &[u8; 32],
202    max_distance: Option<u32>,
203) -> BTreeMap<u32, Vec<[u8; 32]>> {
204    let mut visited: HashSet<[u8; 32]> = HashSet::new();
205    let mut by_distance: BTreeMap<u32, Vec<[u8; 32]>> = BTreeMap::new();
206
207    let mut current: Vec<[u8; 32]> = vec![*root];
208    visited.insert(*root);
209    by_distance.insert(0, current.clone());
210
211    let mut depth: u32 = 0;
212    loop {
213        if let Some(max_distance) = max_distance {
214            if depth >= max_distance {
215                break;
216            }
217        }
218
219        if current.is_empty() {
220            break;
221        }
222
223        let mut next: Vec<[u8; 32]> = Vec::new();
224        for owner in &current {
225            let follows = get_followed_full(txn, ndb, owner);
226            for target in follows {
227                if visited.insert(target) {
228                    next.push(target);
229                }
230            }
231        }
232
233        depth += 1;
234        if !next.is_empty() {
235            by_distance.insert(depth, next.clone());
236        }
237        current = next;
238    }
239
240    by_distance
241}
242
243fn get_followed_cached(
244    txn: &Transaction,
245    ndb: &Ndb,
246    cache: &mut HashMap<[u8; 32], Vec<[u8; 32]>>,
247    owner: &[u8; 32],
248) -> Vec<[u8; 32]> {
249    if let Some(existing) = cache.get(owner) {
250        return existing.clone();
251    }
252    let follows = get_followed_full(txn, ndb, owner);
253    cache.insert(*owner, follows.clone());
254    follows
255}
256
257fn get_muted_cached(
258    txn: &Transaction,
259    ndb: &Ndb,
260    cache: &mut HashMap<[u8; 32], Vec<[u8; 32]>>,
261    owner: &[u8; 32],
262    max_edges_per_node: Option<usize>,
263) -> Vec<[u8; 32]> {
264    if let Some(existing) = cache.get(owner) {
265        return existing.clone();
266    }
267    let mutes = get_muted_full(txn, ndb, owner, max_edges_per_node);
268    cache.insert(*owner, mutes.clone());
269    mutes
270}
271
272fn get_followed_full(txn: &Transaction, ndb: &Ndb, owner: &[u8; 32]) -> Vec<[u8; 32]> {
273    let count = nostrdb_social::socialgraph::followed_count(txn, ndb, owner);
274    let max = count.min(i32::MAX as usize);
275    nostrdb_social::socialgraph::get_followed(txn, ndb, owner, max)
276}
277
278fn get_muted_full(
279    txn: &Transaction,
280    ndb: &Ndb,
281    owner: &[u8; 32],
282    max_edges_per_node: Option<usize>,
283) -> Vec<[u8; 32]> {
284    if let Some(limit) = max_edges_per_node {
285        let capped = limit.min(i32::MAX as usize);
286        return nostrdb_social::socialgraph::get_muted(txn, ndb, owner, capped);
287    }
288
289    let mut max_out = 1024usize.min(MAX_MUTE_FETCH);
290    loop {
291        let mutes = nostrdb_social::socialgraph::get_muted(txn, ndb, owner, max_out);
292        if mutes.len() < max_out || max_out >= MAX_MUTE_FETCH {
293            return mutes;
294        }
295        max_out = (max_out * 2).min(MAX_MUTE_FETCH);
296    }
297}
298
299fn latest_created_at(txn: &Transaction, ndb: &Ndb, owner: &[u8; 32], kind: Kind) -> Result<u64> {
300    let pubkey = match PublicKey::from_slice(owner) {
301        Ok(pk) => pk,
302        Err(_) => return Ok(0),
303    };
304
305    let mut max_seen = 0u64;
306    let mut since = 0u64;
307    let max_results = 512usize;
308
309    loop {
310        let filter = Filter::new()
311            .author(pubkey)
312            .kind(kind)
313            .since(Timestamp::from_secs(since))
314            .limit(max_results);
315        let filter_json = serde_json::to_string(&filter).context("serialize filter")?;
316        let ndb_filter = NdbFilter::from_json(&filter_json).context("build nostrdb filter")?;
317
318        let results = ndb
319            .query(txn, &[ndb_filter], max_results as i32)
320            .context("query nostrdb")?;
321
322        if results.is_empty() {
323            break;
324        }
325
326        for result in &results {
327            let ts = result.note.created_at();
328            if ts > max_seen {
329                max_seen = ts;
330            }
331        }
332
333        if results.len() < max_results {
334            break;
335        }
336
337        if max_seen >= since {
338            since = max_seen.saturating_add(1);
339        } else {
340            break;
341        }
342    }
343
344    Ok(max_seen)
345}
346
347fn encode_snapshot_chunks(data: &SnapshotData) -> Vec<Bytes> {
348    let mut id_map: HashMap<[u8; 32], u32> = HashMap::new();
349    for (idx, pk) in data.used_order.iter().enumerate() {
350        id_map.insert(*pk, idx as u32);
351    }
352
353    let mut writer = ChunkWriter::new();
354    writer.write_varint(BINARY_FORMAT_VERSION);
355
356    writer.write_varint(data.used_order.len() as u64);
357    for (idx, pk) in data.used_order.iter().enumerate() {
358        writer.write_bytes(pk);
359        writer.write_varint(idx as u64);
360    }
361
362    writer.write_varint(data.follow_owners.len() as u64);
363    for owner in &data.follow_owners {
364        let owner_id = id_map.get(owner).copied().unwrap_or_default();
365        let ts = data.follow_created_at.get(owner).copied().unwrap_or(0);
366        let targets = data.follow_targets.get(owner).cloned().unwrap_or_default();
367
368        writer.write_varint(owner_id as u64);
369        writer.write_varint(ts);
370        writer.write_varint(targets.len() as u64);
371        for target in targets {
372            let target_id = id_map.get(&target).copied().unwrap_or_default();
373            writer.write_varint(target_id as u64);
374        }
375    }
376
377    writer.write_varint(data.mute_owners.len() as u64);
378    for owner in &data.mute_owners {
379        let owner_id = id_map.get(owner).copied().unwrap_or_default();
380        let ts = data.mute_created_at.get(owner).copied().unwrap_or(0);
381        let targets = data.mute_targets.get(owner).cloned().unwrap_or_default();
382
383        writer.write_varint(owner_id as u64);
384        writer.write_varint(ts);
385        writer.write_varint(targets.len() as u64);
386        for target in targets {
387            let target_id = id_map.get(&target).copied().unwrap_or_default();
388            writer.write_varint(target_id as u64);
389        }
390    }
391
392    writer.finish()
393}
394
395struct ChunkWriter {
396    buf: BytesMut,
397    chunks: Vec<Bytes>,
398}
399
400impl ChunkWriter {
401    fn new() -> Self {
402        Self {
403            buf: BytesMut::with_capacity(CHUNK_SIZE),
404            chunks: Vec::new(),
405        }
406    }
407
408    fn write_bytes(&mut self, bytes: &[u8]) {
409        let mut offset = 0;
410        while offset < bytes.len() {
411            let remaining = CHUNK_SIZE - self.buf.len();
412            if remaining == 0 {
413                self.flush();
414                continue;
415            }
416            let to_write = remaining.min(bytes.len() - offset);
417            self.buf
418                .extend_from_slice(&bytes[offset..offset + to_write]);
419            offset += to_write;
420        }
421    }
422
423    fn write_varint(&mut self, mut value: u64) {
424        while value >= 0x80 {
425            let byte = ((value as u8) & 0x7f) | 0x80;
426            self.write_bytes(&[byte]);
427            value >>= 7;
428        }
429        self.write_bytes(&[(value as u8) & 0x7f]);
430    }
431
432    fn flush(&mut self) {
433        if self.buf.is_empty() {
434            return;
435        }
436        let chunk = self.buf.split().freeze();
437        self.chunks.push(chunk);
438    }
439
440    fn finish(mut self) -> Vec<Bytes> {
441        self.flush();
442        self.chunks
443    }
444}