hashtree_cli/
sync.rs

1//! Background sync service for auto-pulling trees from Nostr
2//!
3//! Subscribes to:
4//! 1. Own trees (all visibility levels) - highest priority
5//! 2. Followed users' public trees - lower priority
6//!
7//! Uses WebRTC peers first, falls back to Blossom HTTP servers
8
9use anyhow::Result;
10use hashtree_core::{from_hex, to_hex, Cid};
11use nostr_sdk::prelude::*;
12use std::collections::{HashMap, HashSet, VecDeque};
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::RwLock;
17use tracing::{error, info, warn};
18
19use crate::fetch::{FetchConfig, Fetcher};
20use crate::storage::{HashtreeStore, PRIORITY_OWN, PRIORITY_FOLLOWED};
21use crate::webrtc::WebRTCState;
22
23/// Sync priority levels
24#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
25pub enum SyncPriority {
26    /// Own trees - highest priority
27    Own = 0,
28    /// Followed users' trees - lower priority
29    Followed = 1,
30}
31
32/// A tree to sync
33#[derive(Debug, Clone)]
34pub struct SyncTask {
35    /// Nostr key (npub.../treename)
36    pub key: String,
37    /// Content identifier
38    pub cid: Cid,
39    /// Priority level
40    pub priority: SyncPriority,
41    /// When this task was queued
42    pub queued_at: Instant,
43}
44
45/// Configuration for background sync
46#[derive(Debug, Clone)]
47pub struct SyncConfig {
48    /// Enable syncing own trees
49    pub sync_own: bool,
50    /// Enable syncing followed users' public trees
51    pub sync_followed: bool,
52    /// Nostr relays for subscriptions
53    pub relays: Vec<String>,
54    /// Max concurrent sync tasks
55    pub max_concurrent: usize,
56    /// Timeout for WebRTC requests (ms)
57    pub webrtc_timeout_ms: u64,
58    /// Timeout for Blossom requests (ms)
59    pub blossom_timeout_ms: u64,
60}
61
62impl Default for SyncConfig {
63    fn default() -> Self {
64        Self {
65            sync_own: true,
66            sync_followed: true,
67            relays: hashtree_config::DEFAULT_RELAYS
68                .iter()
69                .map(|s| s.to_string())
70                .collect(),
71            max_concurrent: 3,
72            webrtc_timeout_ms: 2000,
73            blossom_timeout_ms: 10000,
74        }
75    }
76}
77
78impl SyncConfig {
79    /// Create from hashtree_config (respects user's config.toml)
80    pub fn from_config(config: &hashtree_config::Config) -> Self {
81        Self {
82            sync_own: true,
83            sync_followed: true,
84            relays: config.nostr.relays.clone(),
85            max_concurrent: 3,
86            webrtc_timeout_ms: 2000,
87            blossom_timeout_ms: 10000,
88        }
89    }
90}
91
92/// State for a subscribed tree
93#[allow(dead_code)]
94struct TreeSubscription {
95    key: String,
96    current_cid: Option<Cid>,
97    priority: SyncPriority,
98    last_synced: Option<Instant>,
99}
100
101/// Background sync service
102pub struct BackgroundSync {
103    config: SyncConfig,
104    store: Arc<HashtreeStore>,
105    webrtc_state: Option<Arc<WebRTCState>>,
106    /// Nostr client for subscriptions
107    client: Client,
108    /// Our public key
109    my_pubkey: PublicKey,
110    /// Subscribed trees
111    subscriptions: Arc<RwLock<HashMap<String, TreeSubscription>>>,
112    /// Sync queue
113    queue: Arc<RwLock<VecDeque<SyncTask>>>,
114    /// Currently syncing hashes
115    syncing: Arc<RwLock<HashSet<String>>>,
116    /// Shutdown signal
117    shutdown_tx: tokio::sync::watch::Sender<bool>,
118    shutdown_rx: tokio::sync::watch::Receiver<bool>,
119    /// Fetcher for remote content
120    fetcher: Arc<Fetcher>,
121}
122
123impl BackgroundSync {
124    /// Create a new background sync service
125    pub async fn new(
126        config: SyncConfig,
127        store: Arc<HashtreeStore>,
128        keys: Keys,
129        webrtc_state: Option<Arc<WebRTCState>>,
130    ) -> Result<Self> {
131        let my_pubkey = keys.public_key();
132        let client = Client::new(keys);
133
134        // Add relays
135        for relay in &config.relays {
136            if let Err(e) = client.add_relay(relay).await {
137                warn!("Failed to add relay {}: {}", relay, e);
138            }
139        }
140
141        // Connect to relays
142        client.connect().await;
143
144        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
145
146        // Create fetcher with config
147        // BlossomClient auto-loads servers from ~/.hashtree/config.toml
148        let fetch_config = FetchConfig {
149            webrtc_timeout: Duration::from_millis(config.webrtc_timeout_ms),
150            blossom_timeout: Duration::from_millis(config.blossom_timeout_ms),
151        };
152        let fetcher = Arc::new(Fetcher::new(fetch_config));
153
154        Ok(Self {
155            config,
156            store,
157            webrtc_state,
158            client,
159            my_pubkey,
160            subscriptions: Arc::new(RwLock::new(HashMap::new())),
161            queue: Arc::new(RwLock::new(VecDeque::new())),
162            syncing: Arc::new(RwLock::new(HashSet::new())),
163            shutdown_tx,
164            shutdown_rx,
165            fetcher,
166        })
167    }
168
169    /// Start the background sync service
170    pub async fn run(&self, contacts_file: PathBuf) -> Result<()> {
171        info!("Starting background sync service");
172
173        // Wait for relays to connect before subscribing
174        tokio::time::sleep(Duration::from_secs(3)).await;
175
176        // Subscribe to own trees
177        if self.config.sync_own {
178            self.subscribe_own_trees().await?;
179        }
180
181        // Subscribe to followed users' trees
182        if self.config.sync_followed {
183            self.subscribe_followed_trees(&contacts_file).await?;
184        }
185
186        // Start sync worker
187        let queue = self.queue.clone();
188        let syncing = self.syncing.clone();
189        let store = self.store.clone();
190        let webrtc_state = self.webrtc_state.clone();
191        let fetcher = self.fetcher.clone();
192        let max_concurrent = self.config.max_concurrent;
193        let mut shutdown_rx = self.shutdown_rx.clone();
194
195        // Spawn sync worker task
196        tokio::spawn(async move {
197            let mut interval = tokio::time::interval(Duration::from_millis(500));
198
199            loop {
200                tokio::select! {
201                    _ = shutdown_rx.changed() => {
202                        if *shutdown_rx.borrow() {
203                            info!("Sync worker shutting down");
204                            break;
205                        }
206                    }
207                    _ = interval.tick() => {
208                        // Check if we can start more sync tasks
209                        let current_syncing = syncing.read().await.len();
210                        if current_syncing >= max_concurrent {
211                            continue;
212                        }
213
214                        // Get next task from queue
215                        let task = {
216                            let mut q = queue.write().await;
217                            q.pop_front()
218                        };
219
220                        if let Some(task) = task {
221                            let hash_hex = to_hex(&task.cid.hash);
222
223                            // Check if already syncing
224                            {
225                                let mut s = syncing.write().await;
226                                if s.contains(&hash_hex) {
227                                    continue;
228                                }
229                                s.insert(hash_hex.clone());
230                            }
231
232                            // Spawn sync task
233                            let syncing_clone = syncing.clone();
234                            let store_clone = store.clone();
235                            let webrtc_clone = webrtc_state.clone();
236                            let fetcher_clone = fetcher.clone();
237
238                            tokio::spawn(async move {
239                                let result = fetcher_clone.fetch_tree(
240                                    &store_clone,
241                                    webrtc_clone.as_ref(),
242                                    &task.cid.hash,
243                                ).await;
244
245                                match result {
246                                    Ok((chunks_fetched, bytes_fetched)) => {
247                                        if chunks_fetched > 0 {
248                                            info!(
249                                                "Synced tree {} ({} chunks, {} bytes)",
250                                                &hash_hex[..12],
251                                                chunks_fetched,
252                                                bytes_fetched
253                                            );
254
255                                            // Index the tree for eviction tracking
256                                            // Extract owner from key (format: "npub.../treename" or "pubkey/treename")
257                                            let (owner, name) = task.key.split_once('/')
258                                                .map(|(o, n)| (o.to_string(), Some(n)))
259                                                .unwrap_or((task.key.clone(), None));
260
261                                            // Map SyncPriority to storage priority
262                                            let storage_priority = match task.priority {
263                                                SyncPriority::Own => PRIORITY_OWN,
264                                                SyncPriority::Followed => PRIORITY_FOLLOWED,
265                                            };
266
267                                            if let Err(e) = store_clone.index_tree(
268                                                &task.cid.hash,
269                                                &owner,
270                                                name,
271                                                storage_priority,
272                                                Some(&task.key), // ref_key for replacing old versions
273                                            ) {
274                                                warn!("Failed to index tree {}: {}", &hash_hex[..12], e);
275                                            }
276
277                                            // Check if eviction is needed
278                                            if let Err(e) = store_clone.evict_if_needed() {
279                                                warn!("Eviction check failed: {}", e);
280                                            }
281                                        } else {
282                                            tracing::debug!("Tree {} already synced", &hash_hex[..12]);
283                                        }
284                                    }
285                                    Err(e) => {
286                                        warn!("Failed to sync tree {}: {}", &hash_hex[..12], e);
287                                    }
288                                }
289
290                                // Remove from syncing set
291                                syncing_clone.write().await.remove(&hash_hex);
292                            });
293                        }
294                    }
295                }
296            }
297        });
298
299        // Handle Nostr notifications for tree updates
300        let mut notifications = self.client.notifications();
301        let subscriptions = self.subscriptions.clone();
302        let queue = self.queue.clone();
303        let mut shutdown_rx = self.shutdown_rx.clone();
304
305        loop {
306            tokio::select! {
307                _ = shutdown_rx.changed() => {
308                    if *shutdown_rx.borrow() {
309                        info!("Background sync shutting down");
310                        break;
311                    }
312                }
313                notification = notifications.recv() => {
314                    match notification {
315                        Ok(RelayPoolNotification::Event { event, .. }) => {
316                            self.handle_tree_event(&event, &subscriptions, &queue).await;
317                        }
318                        Ok(_) => {}
319                        Err(e) => {
320                            error!("Notification error: {}", e);
321                            break;
322                        }
323                    }
324                }
325            }
326        }
327
328        Ok(())
329    }
330
331    /// Subscribe to own trees (kind 30078 events from our pubkey)
332    async fn subscribe_own_trees(&self) -> Result<()> {
333        let filter = Filter::new()
334            .kind(Kind::Custom(30078))
335            .author(self.my_pubkey)
336            .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
337
338        match self.client.subscribe(vec![filter], None).await {
339            Ok(_) => {
340                info!(
341                    "Subscribed to own trees for {}",
342                    self.my_pubkey.to_bech32().unwrap_or_default()
343                );
344            }
345            Err(e) => {
346                warn!("Failed to subscribe to own trees (will retry on reconnect): {}", e);
347            }
348        }
349
350        Ok(())
351    }
352
353    /// Subscribe to followed users' trees
354    async fn subscribe_followed_trees(&self, contacts_file: &PathBuf) -> Result<()> {
355        // Load contacts from file
356        let contacts: Vec<String> = if contacts_file.exists() {
357            let data = std::fs::read_to_string(contacts_file)?;
358            serde_json::from_str(&data).unwrap_or_default()
359        } else {
360            Vec::new()
361        };
362
363        if contacts.is_empty() {
364            info!("No contacts to subscribe to");
365            return Ok(());
366        }
367
368        // Convert hex pubkeys to PublicKey
369        let pubkeys: Vec<PublicKey> = contacts
370            .iter()
371            .filter_map(|hex| PublicKey::from_hex(hex).ok())
372            .collect();
373
374        if pubkeys.is_empty() {
375            return Ok(());
376        }
377
378        // Subscribe to all followed users' hashtree events
379        let filter = Filter::new()
380            .kind(Kind::Custom(30078))
381            .authors(pubkeys.clone())
382            .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
383
384        match self.client.subscribe(vec![filter], None).await {
385            Ok(_) => {
386                info!("Subscribed to {} followed users' trees", pubkeys.len());
387            }
388            Err(e) => {
389                warn!("Failed to subscribe to followed trees (will retry on reconnect): {}", e);
390            }
391        }
392
393        Ok(())
394    }
395
396    /// Handle incoming tree event
397    async fn handle_tree_event(
398        &self,
399        event: &Event,
400        subscriptions: &Arc<RwLock<HashMap<String, TreeSubscription>>>,
401        queue: &Arc<RwLock<VecDeque<SyncTask>>>,
402    ) {
403        // Check if it's a hashtree event
404        let has_hashtree_tag = event.tags.iter().any(|tag| {
405            let v = tag.as_slice();
406            v.len() >= 2 && v[0] == "l" && v[1] == "hashtree"
407        });
408
409        if !has_hashtree_tag || event.kind != Kind::Custom(30078) {
410            return;
411        }
412
413        // Extract d-tag (tree name)
414        let d_tag = event.tags.iter().find_map(|tag| {
415            if let Some(TagStandard::Identifier(id)) = tag.as_standardized() {
416                Some(id.clone())
417            } else {
418                None
419            }
420        });
421
422        let tree_name = match d_tag {
423            Some(name) => name,
424            None => return,
425        };
426
427        // Extract hash and key from tags
428        let mut hash_hex: Option<String> = None;
429        let mut key_hex: Option<String> = None;
430
431        for tag in event.tags.iter() {
432            let tag_vec = tag.as_slice();
433            if tag_vec.len() >= 2 {
434                match tag_vec[0].as_str() {
435                    "hash" => hash_hex = Some(tag_vec[1].clone()),
436                    "key" => key_hex = Some(tag_vec[1].clone()),
437                    _ => {}
438                }
439            }
440        }
441
442        let hash = match hash_hex.and_then(|h| from_hex(&h).ok()) {
443            Some(h) => h,
444            None => return,
445        };
446
447        let key = key_hex.and_then(|k| {
448            let bytes = hex::decode(&k).ok()?;
449            if bytes.len() == 32 {
450                let mut arr = [0u8; 32];
451                arr.copy_from_slice(&bytes);
452                Some(arr)
453            } else {
454                None
455            }
456        });
457
458        let cid = Cid { hash, key };
459
460        // Build key
461        let npub = event.pubkey.to_bech32().unwrap_or_else(|_| event.pubkey.to_hex());
462        let key = format!("{}/{}", npub, tree_name);
463
464        // Determine priority
465        let priority = if event.pubkey == self.my_pubkey {
466            SyncPriority::Own
467        } else {
468            SyncPriority::Followed
469        };
470
471        // Check if we need to sync
472        let should_sync = {
473            let mut subs = subscriptions.write().await;
474            let sub = subs.entry(key.clone()).or_insert(TreeSubscription {
475                key: key.clone(),
476                current_cid: None,
477                priority,
478                last_synced: None,
479            });
480
481            // Check if CID changed
482            let changed = sub.current_cid.as_ref().map(|c| c.hash) != Some(cid.hash);
483            if changed {
484                sub.current_cid = Some(cid.clone());
485                true
486            } else {
487                false
488            }
489        };
490
491        if should_sync {
492            info!("New tree update: {} -> {}", key, to_hex(&cid.hash)[..12].to_string());
493
494            // Add to sync queue
495            let task = SyncTask {
496                key,
497                cid,
498                priority,
499                queued_at: Instant::now(),
500            };
501
502            let mut q = queue.write().await;
503
504            // Insert based on priority (own trees first)
505            let insert_pos = q
506                .iter()
507                .position(|t| t.priority > task.priority)
508                .unwrap_or(q.len());
509            q.insert(insert_pos, task);
510        }
511    }
512
513    /// Signal shutdown
514    pub fn shutdown(&self) {
515        let _ = self.shutdown_tx.send(true);
516    }
517
518    /// Queue a manual sync for a specific tree
519    pub async fn queue_sync(&self, key: &str, cid: Cid, priority: SyncPriority) {
520        let task = SyncTask {
521            key: key.to_string(),
522            cid,
523            priority,
524            queued_at: Instant::now(),
525        };
526
527        let mut q = self.queue.write().await;
528        let insert_pos = q
529            .iter()
530            .position(|t| t.priority > task.priority)
531            .unwrap_or(q.len());
532        q.insert(insert_pos, task);
533    }
534
535    /// Get current sync status
536    pub async fn status(&self) -> SyncStatus {
537        let subscriptions = self.subscriptions.read().await;
538        let queue = self.queue.read().await;
539        let syncing = self.syncing.read().await;
540
541        SyncStatus {
542            subscribed_trees: subscriptions.len(),
543            queued_tasks: queue.len(),
544            active_syncs: syncing.len(),
545        }
546    }
547}
548
549/// Overall sync status
550#[derive(Debug, Clone)]
551pub struct SyncStatus {
552    pub subscribed_trees: usize,
553    pub queued_tasks: usize,
554    pub active_syncs: usize,
555}