hashtree_cli/
sync.rs

1//! Background sync service for auto-pulling trees from Nostr
2//!
3//! Subscribes to:
4//! 1. Own trees (all visibility levels) - highest priority
5//! 2. Followed users' public trees - lower priority
6//!
7//! Uses WebRTC peers first, falls back to Blossom HTTP servers
8
9use anyhow::Result;
10use hashtree_core::{from_hex, to_hex, Cid};
11use nostr_sdk::prelude::*;
12use std::collections::{HashMap, HashSet, VecDeque};
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::RwLock;
17use tracing::{error, info, warn};
18
19use crate::fetch::{FetchConfig, Fetcher};
20use crate::storage::{HashtreeStore, PRIORITY_OWN, PRIORITY_FOLLOWED};
21use crate::webrtc::WebRTCState;
22
23/// Sync priority levels
24#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
25pub enum SyncPriority {
26    /// Own trees - highest priority
27    Own = 0,
28    /// Followed users' trees - lower priority
29    Followed = 1,
30}
31
32/// A tree to sync
33#[derive(Debug, Clone)]
34pub struct SyncTask {
35    /// Nostr key (npub.../treename)
36    pub key: String,
37    /// Content identifier
38    pub cid: Cid,
39    /// Priority level
40    pub priority: SyncPriority,
41    /// When this task was queued
42    pub queued_at: Instant,
43}
44
45/// Configuration for background sync
46#[derive(Debug, Clone)]
47pub struct SyncConfig {
48    /// Enable syncing own trees
49    pub sync_own: bool,
50    /// Enable syncing followed users' public trees
51    pub sync_followed: bool,
52    /// Blossom servers for fallback
53    pub blossom_servers: Vec<String>,
54    /// Nostr relays for subscriptions
55    pub relays: Vec<String>,
56    /// Max concurrent sync tasks
57    pub max_concurrent: usize,
58    /// Timeout for WebRTC requests (ms)
59    pub webrtc_timeout_ms: u64,
60    /// Timeout for Blossom requests (ms)
61    pub blossom_timeout_ms: u64,
62}
63
64impl Default for SyncConfig {
65    fn default() -> Self {
66        Self {
67            sync_own: true,
68            sync_followed: true,
69            blossom_servers: vec!["https://blossom.iris.to".to_string()],
70            relays: vec![
71                "wss://relay.damus.io".to_string(),
72                "wss://relay.primal.net".to_string(),
73                "wss://nos.lol".to_string(),
74            ],
75            max_concurrent: 3,
76            webrtc_timeout_ms: 2000,
77            blossom_timeout_ms: 10000,
78        }
79    }
80}
81
82/// State for a subscribed tree
83#[allow(dead_code)]
84struct TreeSubscription {
85    key: String,
86    current_cid: Option<Cid>,
87    priority: SyncPriority,
88    last_synced: Option<Instant>,
89}
90
91/// Background sync service
92pub struct BackgroundSync {
93    config: SyncConfig,
94    store: Arc<HashtreeStore>,
95    webrtc_state: Option<Arc<WebRTCState>>,
96    /// Nostr client for subscriptions
97    client: Client,
98    /// Our public key
99    my_pubkey: PublicKey,
100    /// Subscribed trees
101    subscriptions: Arc<RwLock<HashMap<String, TreeSubscription>>>,
102    /// Sync queue
103    queue: Arc<RwLock<VecDeque<SyncTask>>>,
104    /// Currently syncing hashes
105    syncing: Arc<RwLock<HashSet<String>>>,
106    /// Shutdown signal
107    shutdown_tx: tokio::sync::watch::Sender<bool>,
108    shutdown_rx: tokio::sync::watch::Receiver<bool>,
109    /// Fetcher for remote content
110    fetcher: Arc<Fetcher>,
111}
112
113impl BackgroundSync {
114    /// Create a new background sync service
115    pub async fn new(
116        config: SyncConfig,
117        store: Arc<HashtreeStore>,
118        keys: Keys,
119        webrtc_state: Option<Arc<WebRTCState>>,
120    ) -> Result<Self> {
121        let my_pubkey = keys.public_key();
122        let client = Client::new(keys);
123
124        // Add relays
125        for relay in &config.relays {
126            if let Err(e) = client.add_relay(relay).await {
127                warn!("Failed to add relay {}: {}", relay, e);
128            }
129        }
130
131        // Connect to relays
132        client.connect().await;
133
134        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
135
136        // Create fetcher with config
137        let fetch_config = FetchConfig {
138            blossom_servers: config.blossom_servers.clone(),
139            webrtc_timeout: Duration::from_millis(config.webrtc_timeout_ms),
140            blossom_timeout: Duration::from_millis(config.blossom_timeout_ms),
141        };
142        let fetcher = Arc::new(Fetcher::new(fetch_config));
143
144        Ok(Self {
145            config,
146            store,
147            webrtc_state,
148            client,
149            my_pubkey,
150            subscriptions: Arc::new(RwLock::new(HashMap::new())),
151            queue: Arc::new(RwLock::new(VecDeque::new())),
152            syncing: Arc::new(RwLock::new(HashSet::new())),
153            shutdown_tx,
154            shutdown_rx,
155            fetcher,
156        })
157    }
158
159    /// Start the background sync service
160    pub async fn run(&self, contacts_file: PathBuf) -> Result<()> {
161        info!("Starting background sync service");
162
163        // Wait for relays to connect before subscribing
164        tokio::time::sleep(Duration::from_secs(3)).await;
165
166        // Subscribe to own trees
167        if self.config.sync_own {
168            self.subscribe_own_trees().await?;
169        }
170
171        // Subscribe to followed users' trees
172        if self.config.sync_followed {
173            self.subscribe_followed_trees(&contacts_file).await?;
174        }
175
176        // Start sync worker
177        let queue = self.queue.clone();
178        let syncing = self.syncing.clone();
179        let store = self.store.clone();
180        let webrtc_state = self.webrtc_state.clone();
181        let fetcher = self.fetcher.clone();
182        let max_concurrent = self.config.max_concurrent;
183        let mut shutdown_rx = self.shutdown_rx.clone();
184
185        // Spawn sync worker task
186        tokio::spawn(async move {
187            let mut interval = tokio::time::interval(Duration::from_millis(500));
188
189            loop {
190                tokio::select! {
191                    _ = shutdown_rx.changed() => {
192                        if *shutdown_rx.borrow() {
193                            info!("Sync worker shutting down");
194                            break;
195                        }
196                    }
197                    _ = interval.tick() => {
198                        // Check if we can start more sync tasks
199                        let current_syncing = syncing.read().await.len();
200                        if current_syncing >= max_concurrent {
201                            continue;
202                        }
203
204                        // Get next task from queue
205                        let task = {
206                            let mut q = queue.write().await;
207                            q.pop_front()
208                        };
209
210                        if let Some(task) = task {
211                            let hash_hex = to_hex(&task.cid.hash);
212
213                            // Check if already syncing
214                            {
215                                let mut s = syncing.write().await;
216                                if s.contains(&hash_hex) {
217                                    continue;
218                                }
219                                s.insert(hash_hex.clone());
220                            }
221
222                            // Spawn sync task
223                            let syncing_clone = syncing.clone();
224                            let store_clone = store.clone();
225                            let webrtc_clone = webrtc_state.clone();
226                            let fetcher_clone = fetcher.clone();
227
228                            tokio::spawn(async move {
229                                let result = fetcher_clone.fetch_tree(
230                                    &store_clone,
231                                    webrtc_clone.as_ref(),
232                                    &task.cid.hash,
233                                ).await;
234
235                                match result {
236                                    Ok((chunks_fetched, bytes_fetched)) => {
237                                        if chunks_fetched > 0 {
238                                            info!(
239                                                "Synced tree {} ({} chunks, {} bytes)",
240                                                &hash_hex[..12],
241                                                chunks_fetched,
242                                                bytes_fetched
243                                            );
244
245                                            // Index the tree for eviction tracking
246                                            // Extract owner from key (format: "npub.../treename" or "pubkey/treename")
247                                            let (owner, name) = task.key.split_once('/')
248                                                .map(|(o, n)| (o.to_string(), Some(n)))
249                                                .unwrap_or((task.key.clone(), None));
250
251                                            // Map SyncPriority to storage priority
252                                            let storage_priority = match task.priority {
253                                                SyncPriority::Own => PRIORITY_OWN,
254                                                SyncPriority::Followed => PRIORITY_FOLLOWED,
255                                            };
256
257                                            if let Err(e) = store_clone.index_tree(
258                                                &task.cid.hash,
259                                                &owner,
260                                                name,
261                                                storage_priority,
262                                                Some(&task.key), // ref_key for replacing old versions
263                                            ) {
264                                                warn!("Failed to index tree {}: {}", &hash_hex[..12], e);
265                                            }
266
267                                            // Check if eviction is needed
268                                            if let Err(e) = store_clone.evict_if_needed() {
269                                                warn!("Eviction check failed: {}", e);
270                                            }
271                                        } else {
272                                            tracing::debug!("Tree {} already synced", &hash_hex[..12]);
273                                        }
274                                    }
275                                    Err(e) => {
276                                        warn!("Failed to sync tree {}: {}", &hash_hex[..12], e);
277                                    }
278                                }
279
280                                // Remove from syncing set
281                                syncing_clone.write().await.remove(&hash_hex);
282                            });
283                        }
284                    }
285                }
286            }
287        });
288
289        // Handle Nostr notifications for tree updates
290        let mut notifications = self.client.notifications();
291        let subscriptions = self.subscriptions.clone();
292        let queue = self.queue.clone();
293        let mut shutdown_rx = self.shutdown_rx.clone();
294
295        loop {
296            tokio::select! {
297                _ = shutdown_rx.changed() => {
298                    if *shutdown_rx.borrow() {
299                        info!("Background sync shutting down");
300                        break;
301                    }
302                }
303                notification = notifications.recv() => {
304                    match notification {
305                        Ok(RelayPoolNotification::Event { event, .. }) => {
306                            self.handle_tree_event(&event, &subscriptions, &queue).await;
307                        }
308                        Ok(_) => {}
309                        Err(e) => {
310                            error!("Notification error: {}", e);
311                            break;
312                        }
313                    }
314                }
315            }
316        }
317
318        Ok(())
319    }
320
321    /// Subscribe to own trees (kind 30078 events from our pubkey)
322    async fn subscribe_own_trees(&self) -> Result<()> {
323        let filter = Filter::new()
324            .kind(Kind::Custom(30078))
325            .author(self.my_pubkey)
326            .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
327
328        match self.client.subscribe(vec![filter], None).await {
329            Ok(_) => {
330                info!(
331                    "Subscribed to own trees for {}",
332                    self.my_pubkey.to_bech32().unwrap_or_default()
333                );
334            }
335            Err(e) => {
336                warn!("Failed to subscribe to own trees (will retry on reconnect): {}", e);
337            }
338        }
339
340        Ok(())
341    }
342
343    /// Subscribe to followed users' trees
344    async fn subscribe_followed_trees(&self, contacts_file: &PathBuf) -> Result<()> {
345        // Load contacts from file
346        let contacts: Vec<String> = if contacts_file.exists() {
347            let data = std::fs::read_to_string(contacts_file)?;
348            serde_json::from_str(&data).unwrap_or_default()
349        } else {
350            Vec::new()
351        };
352
353        if contacts.is_empty() {
354            info!("No contacts to subscribe to");
355            return Ok(());
356        }
357
358        // Convert hex pubkeys to PublicKey
359        let pubkeys: Vec<PublicKey> = contacts
360            .iter()
361            .filter_map(|hex| PublicKey::from_hex(hex).ok())
362            .collect();
363
364        if pubkeys.is_empty() {
365            return Ok(());
366        }
367
368        // Subscribe to all followed users' hashtree events
369        let filter = Filter::new()
370            .kind(Kind::Custom(30078))
371            .authors(pubkeys.clone())
372            .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
373
374        match self.client.subscribe(vec![filter], None).await {
375            Ok(_) => {
376                info!("Subscribed to {} followed users' trees", pubkeys.len());
377            }
378            Err(e) => {
379                warn!("Failed to subscribe to followed trees (will retry on reconnect): {}", e);
380            }
381        }
382
383        Ok(())
384    }
385
386    /// Handle incoming tree event
387    async fn handle_tree_event(
388        &self,
389        event: &Event,
390        subscriptions: &Arc<RwLock<HashMap<String, TreeSubscription>>>,
391        queue: &Arc<RwLock<VecDeque<SyncTask>>>,
392    ) {
393        // Check if it's a hashtree event
394        let has_hashtree_tag = event.tags.iter().any(|tag| {
395            let v = tag.as_slice();
396            v.len() >= 2 && v[0] == "l" && v[1] == "hashtree"
397        });
398
399        if !has_hashtree_tag || event.kind != Kind::Custom(30078) {
400            return;
401        }
402
403        // Extract d-tag (tree name)
404        let d_tag = event.tags.iter().find_map(|tag| {
405            if let Some(TagStandard::Identifier(id)) = tag.as_standardized() {
406                Some(id.clone())
407            } else {
408                None
409            }
410        });
411
412        let tree_name = match d_tag {
413            Some(name) => name,
414            None => return,
415        };
416
417        // Extract hash and key from tags
418        let mut hash_hex: Option<String> = None;
419        let mut key_hex: Option<String> = None;
420
421        for tag in event.tags.iter() {
422            let tag_vec = tag.as_slice();
423            if tag_vec.len() >= 2 {
424                match tag_vec[0].as_str() {
425                    "hash" => hash_hex = Some(tag_vec[1].clone()),
426                    "key" => key_hex = Some(tag_vec[1].clone()),
427                    _ => {}
428                }
429            }
430        }
431
432        let hash = match hash_hex.and_then(|h| from_hex(&h).ok()) {
433            Some(h) => h,
434            None => return,
435        };
436
437        let key = key_hex.and_then(|k| {
438            let bytes = hex::decode(&k).ok()?;
439            if bytes.len() == 32 {
440                let mut arr = [0u8; 32];
441                arr.copy_from_slice(&bytes);
442                Some(arr)
443            } else {
444                None
445            }
446        });
447
448        let cid = Cid { hash, key, size: 0 };
449
450        // Build key
451        let npub = event.pubkey.to_bech32().unwrap_or_else(|_| event.pubkey.to_hex());
452        let key = format!("{}/{}", npub, tree_name);
453
454        // Determine priority
455        let priority = if event.pubkey == self.my_pubkey {
456            SyncPriority::Own
457        } else {
458            SyncPriority::Followed
459        };
460
461        // Check if we need to sync
462        let should_sync = {
463            let mut subs = subscriptions.write().await;
464            let sub = subs.entry(key.clone()).or_insert(TreeSubscription {
465                key: key.clone(),
466                current_cid: None,
467                priority,
468                last_synced: None,
469            });
470
471            // Check if CID changed
472            let changed = sub.current_cid.as_ref().map(|c| c.hash) != Some(cid.hash);
473            if changed {
474                sub.current_cid = Some(cid.clone());
475                true
476            } else {
477                false
478            }
479        };
480
481        if should_sync {
482            info!("New tree update: {} -> {}", key, to_hex(&cid.hash)[..12].to_string());
483
484            // Add to sync queue
485            let task = SyncTask {
486                key,
487                cid,
488                priority,
489                queued_at: Instant::now(),
490            };
491
492            let mut q = queue.write().await;
493
494            // Insert based on priority (own trees first)
495            let insert_pos = q
496                .iter()
497                .position(|t| t.priority > task.priority)
498                .unwrap_or(q.len());
499            q.insert(insert_pos, task);
500        }
501    }
502
503    /// Signal shutdown
504    pub fn shutdown(&self) {
505        let _ = self.shutdown_tx.send(true);
506    }
507
508    /// Queue a manual sync for a specific tree
509    pub async fn queue_sync(&self, key: &str, cid: Cid, priority: SyncPriority) {
510        let task = SyncTask {
511            key: key.to_string(),
512            cid,
513            priority,
514            queued_at: Instant::now(),
515        };
516
517        let mut q = self.queue.write().await;
518        let insert_pos = q
519            .iter()
520            .position(|t| t.priority > task.priority)
521            .unwrap_or(q.len());
522        q.insert(insert_pos, task);
523    }
524
525    /// Get current sync status
526    pub async fn status(&self) -> SyncStatus {
527        let subscriptions = self.subscriptions.read().await;
528        let queue = self.queue.read().await;
529        let syncing = self.syncing.read().await;
530
531        SyncStatus {
532            subscribed_trees: subscriptions.len(),
533            queued_tasks: queue.len(),
534            active_syncs: syncing.len(),
535        }
536    }
537}
538
539/// Overall sync status
540#[derive(Debug, Clone)]
541pub struct SyncStatus {
542    pub subscribed_trees: usize,
543    pub queued_tasks: usize,
544    pub active_syncs: usize,
545}