Skip to main content

hashtree_cli/
sync.rs

1//! Background sync service for auto-pulling trees from Nostr
2//!
3//! Subscribes to:
4//! 1. Own trees (all visibility levels) - highest priority
5//! 2. Followed users' public trees - lower priority
6//!
7//! Uses WebRTC peers first, falls back to Blossom HTTP servers
8
9use anyhow::Result;
10use hashtree_core::{from_hex, to_hex, Cid};
11use nostr_sdk::prelude::*;
12use std::collections::{HashMap, HashSet, VecDeque};
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::RwLock;
17use tracing::{error, info, warn};
18
19use crate::fetch::{FetchConfig, Fetcher};
20use crate::storage::{HashtreeStore, PRIORITY_FOLLOWED, PRIORITY_OWN};
21use crate::webrtc::WebRTCState;
22
23/// Sync priority levels
24#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
25pub enum SyncPriority {
26    /// Own trees - highest priority
27    Own = 0,
28    /// Followed users' trees - lower priority
29    Followed = 1,
30}
31
32/// A tree to sync
33#[derive(Debug, Clone)]
34pub struct SyncTask {
35    /// Nostr key (npub.../treename)
36    pub key: String,
37    /// Content identifier
38    pub cid: Cid,
39    /// Priority level
40    pub priority: SyncPriority,
41    /// When this task was queued
42    pub queued_at: Instant,
43}
44
45/// Configuration for background sync
46#[derive(Debug, Clone)]
47pub struct SyncConfig {
48    /// Enable syncing own trees
49    pub sync_own: bool,
50    /// Enable syncing followed users' public trees
51    pub sync_followed: bool,
52    /// Nostr relays for subscriptions
53    pub relays: Vec<String>,
54    /// Max concurrent sync tasks
55    pub max_concurrent: usize,
56    /// Timeout for WebRTC requests (ms)
57    pub webrtc_timeout_ms: u64,
58    /// Timeout for Blossom requests (ms)
59    pub blossom_timeout_ms: u64,
60}
61
62impl Default for SyncConfig {
63    fn default() -> Self {
64        Self {
65            sync_own: true,
66            sync_followed: true,
67            relays: hashtree_config::DEFAULT_RELAYS
68                .iter()
69                .map(|s| s.to_string())
70                .collect(),
71            max_concurrent: 3,
72            webrtc_timeout_ms: 2000,
73            blossom_timeout_ms: 10000,
74        }
75    }
76}
77
78impl SyncConfig {
79    /// Create from hashtree_config (respects user's config.toml)
80    pub fn from_config(config: &hashtree_config::Config) -> Self {
81        Self {
82            sync_own: true,
83            sync_followed: true,
84            relays: config.nostr.relays.clone(),
85            max_concurrent: 3,
86            webrtc_timeout_ms: 2000,
87            blossom_timeout_ms: 10000,
88        }
89    }
90}
91
92/// State for a subscribed tree
93#[allow(dead_code)]
94struct TreeSubscription {
95    key: String,
96    current_cid: Option<Cid>,
97    priority: SyncPriority,
98    last_synced: Option<Instant>,
99}
100
101/// Background sync service
102pub struct BackgroundSync {
103    config: SyncConfig,
104    store: Arc<HashtreeStore>,
105    webrtc_state: Option<Arc<WebRTCState>>,
106    /// Nostr client for subscriptions
107    client: Client,
108    /// Our public key
109    my_pubkey: PublicKey,
110    /// Subscribed trees
111    subscriptions: Arc<RwLock<HashMap<String, TreeSubscription>>>,
112    /// Sync queue
113    queue: Arc<RwLock<VecDeque<SyncTask>>>,
114    /// Currently syncing hashes
115    syncing: Arc<RwLock<HashSet<String>>>,
116    /// Shutdown signal
117    shutdown_tx: tokio::sync::watch::Sender<bool>,
118    shutdown_rx: tokio::sync::watch::Receiver<bool>,
119    /// Fetcher for remote content
120    fetcher: Arc<Fetcher>,
121}
122
123impl BackgroundSync {
124    /// Create a new background sync service
125    pub async fn new(
126        config: SyncConfig,
127        store: Arc<HashtreeStore>,
128        keys: Keys,
129        webrtc_state: Option<Arc<WebRTCState>>,
130    ) -> Result<Self> {
131        let my_pubkey = keys.public_key();
132        let client = Client::new(keys);
133
134        // Add relays
135        for relay in &config.relays {
136            if let Err(e) = client.add_relay(relay).await {
137                warn!("Failed to add relay {}: {}", relay, e);
138            }
139        }
140
141        // Connect to relays
142        client.connect().await;
143
144        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
145
146        // Create fetcher with config
147        // BlossomClient auto-loads servers from ~/.hashtree/config.toml
148        let fetch_config = FetchConfig {
149            webrtc_timeout: Duration::from_millis(config.webrtc_timeout_ms),
150            blossom_timeout: Duration::from_millis(config.blossom_timeout_ms),
151        };
152        let fetcher = Arc::new(Fetcher::new(fetch_config));
153
154        Ok(Self {
155            config,
156            store,
157            webrtc_state,
158            client,
159            my_pubkey,
160            subscriptions: Arc::new(RwLock::new(HashMap::new())),
161            queue: Arc::new(RwLock::new(VecDeque::new())),
162            syncing: Arc::new(RwLock::new(HashSet::new())),
163            shutdown_tx,
164            shutdown_rx,
165            fetcher,
166        })
167    }
168
169    /// Start the background sync service
170    pub async fn run(&self, contacts_file: PathBuf) -> Result<()> {
171        info!("Starting background sync service");
172
173        // Wait for relays to connect before subscribing
174        tokio::time::sleep(Duration::from_secs(3)).await;
175
176        // Subscribe to own trees
177        if self.config.sync_own {
178            self.subscribe_own_trees().await?;
179        }
180
181        // Subscribe to followed users' trees
182        if self.config.sync_followed {
183            self.subscribe_followed_trees(&contacts_file).await?;
184        }
185
186        // Start sync worker
187        let queue = self.queue.clone();
188        let syncing = self.syncing.clone();
189        let store = self.store.clone();
190        let webrtc_state = self.webrtc_state.clone();
191        let fetcher = self.fetcher.clone();
192        let max_concurrent = self.config.max_concurrent;
193        let mut shutdown_rx = self.shutdown_rx.clone();
194
195        // Spawn sync worker task
196        tokio::spawn(async move {
197            let mut interval = tokio::time::interval(Duration::from_millis(500));
198
199            loop {
200                tokio::select! {
201                    _ = shutdown_rx.changed() => {
202                        if *shutdown_rx.borrow() {
203                            info!("Sync worker shutting down");
204                            break;
205                        }
206                    }
207                    _ = interval.tick() => {
208                        // Check if we can start more sync tasks
209                        let current_syncing = syncing.read().await.len();
210                        if current_syncing >= max_concurrent {
211                            continue;
212                        }
213
214                        // Get next task from queue
215                        let task = {
216                            let mut q = queue.write().await;
217                            q.pop_front()
218                        };
219
220                        if let Some(task) = task {
221                            let hash_hex = to_hex(&task.cid.hash);
222
223                            // Check if already syncing
224                            {
225                                let mut s = syncing.write().await;
226                                if s.contains(&hash_hex) {
227                                    continue;
228                                }
229                                s.insert(hash_hex.clone());
230                            }
231
232                            // Spawn sync task
233                            let syncing_clone = syncing.clone();
234                            let store_clone = store.clone();
235                            let webrtc_clone = webrtc_state.clone();
236                            let fetcher_clone = fetcher.clone();
237
238                            tokio::spawn(async move {
239                                let result = fetcher_clone.fetch_tree(
240                                    &store_clone,
241                                    webrtc_clone.as_ref(),
242                                    &task.cid.hash,
243                                ).await;
244
245                                match result {
246                                    Ok((chunks_fetched, bytes_fetched)) => {
247                                        if chunks_fetched > 0 {
248                                            info!(
249                                                "Synced tree {} ({} chunks, {} bytes)",
250                                                &hash_hex[..12],
251                                                chunks_fetched,
252                                                bytes_fetched
253                                            );
254
255                                            // Index the tree for eviction tracking
256                                            // Extract owner from key (format: "npub.../treename" or "pubkey/treename")
257                                            let (owner, name) = task.key.split_once('/')
258                                                .map(|(o, n)| (o.to_string(), Some(n)))
259                                                .unwrap_or((task.key.clone(), None));
260
261                                            // Map SyncPriority to storage priority
262                                            let storage_priority = match task.priority {
263                                                SyncPriority::Own => PRIORITY_OWN,
264                                                SyncPriority::Followed => PRIORITY_FOLLOWED,
265                                            };
266
267                                            if let Err(e) = store_clone.index_tree(
268                                                &task.cid.hash,
269                                                &owner,
270                                                name,
271                                                storage_priority,
272                                                Some(&task.key), // ref_key for replacing old versions
273                                            ) {
274                                                warn!("Failed to index tree {}: {}", &hash_hex[..12], e);
275                                            }
276
277                                            // Check if eviction is needed
278                                            if let Err(e) = store_clone.evict_if_needed() {
279                                                warn!("Eviction check failed: {}", e);
280                                            }
281                                        } else {
282                                            tracing::debug!("Tree {} already synced", &hash_hex[..12]);
283                                        }
284                                    }
285                                    Err(e) => {
286                                        warn!("Failed to sync tree {}: {}", &hash_hex[..12], e);
287                                    }
288                                }
289
290                                // Remove from syncing set
291                                syncing_clone.write().await.remove(&hash_hex);
292                            });
293                        }
294                    }
295                }
296            }
297        });
298
299        // Handle Nostr notifications for tree updates
300        let mut notifications = self.client.notifications();
301        let subscriptions = self.subscriptions.clone();
302        let queue = self.queue.clone();
303        let mut shutdown_rx = self.shutdown_rx.clone();
304
305        loop {
306            tokio::select! {
307                _ = shutdown_rx.changed() => {
308                    if *shutdown_rx.borrow() {
309                        info!("Background sync shutting down");
310                        break;
311                    }
312                }
313                notification = notifications.recv() => {
314                    match notification {
315                        Ok(RelayPoolNotification::Event { event, .. }) => {
316                            self.handle_tree_event(&event, &subscriptions, &queue).await;
317                        }
318                        Ok(_) => {}
319                        Err(e) => {
320                            error!("Notification error: {}", e);
321                            break;
322                        }
323                    }
324                }
325            }
326        }
327
328        Ok(())
329    }
330
331    /// Subscribe to own trees (kind 30078 events from our pubkey)
332    async fn subscribe_own_trees(&self) -> Result<()> {
333        let filter = Filter::new()
334            .kind(Kind::Custom(30078))
335            .author(self.my_pubkey)
336            .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
337
338        match self.client.subscribe(vec![filter], None).await {
339            Ok(_) => {
340                info!(
341                    "Subscribed to own trees for {}",
342                    self.my_pubkey.to_bech32().unwrap_or_default()
343                );
344            }
345            Err(e) => {
346                warn!(
347                    "Failed to subscribe to own trees (will retry on reconnect): {}",
348                    e
349                );
350            }
351        }
352
353        Ok(())
354    }
355
356    /// Subscribe to followed users' trees
357    async fn subscribe_followed_trees(&self, contacts_file: &PathBuf) -> Result<()> {
358        // Load contacts from file
359        let contacts: Vec<String> = if contacts_file.exists() {
360            let data = std::fs::read_to_string(contacts_file)?;
361            serde_json::from_str(&data).unwrap_or_default()
362        } else {
363            Vec::new()
364        };
365
366        if contacts.is_empty() {
367            info!("No contacts to subscribe to");
368            return Ok(());
369        }
370
371        // Convert hex pubkeys to PublicKey
372        let pubkeys: Vec<PublicKey> = contacts
373            .iter()
374            .filter_map(|hex| PublicKey::from_hex(hex).ok())
375            .collect();
376
377        if pubkeys.is_empty() {
378            return Ok(());
379        }
380
381        // Subscribe to all followed users' hashtree events
382        let filter = Filter::new()
383            .kind(Kind::Custom(30078))
384            .authors(pubkeys.clone())
385            .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
386
387        match self.client.subscribe(vec![filter], None).await {
388            Ok(_) => {
389                info!("Subscribed to {} followed users' trees", pubkeys.len());
390            }
391            Err(e) => {
392                warn!(
393                    "Failed to subscribe to followed trees (will retry on reconnect): {}",
394                    e
395                );
396            }
397        }
398
399        Ok(())
400    }
401
402    /// Handle incoming tree event
403    async fn handle_tree_event(
404        &self,
405        event: &Event,
406        subscriptions: &Arc<RwLock<HashMap<String, TreeSubscription>>>,
407        queue: &Arc<RwLock<VecDeque<SyncTask>>>,
408    ) {
409        // Check if it's a hashtree event
410        let has_hashtree_tag = event.tags.iter().any(|tag| {
411            let v = tag.as_slice();
412            v.len() >= 2 && v[0] == "l" && v[1] == "hashtree"
413        });
414
415        if !has_hashtree_tag || event.kind != Kind::Custom(30078) {
416            return;
417        }
418
419        // Extract d-tag (tree name)
420        let d_tag = event.tags.iter().find_map(|tag| {
421            if let Some(TagStandard::Identifier(id)) = tag.as_standardized() {
422                Some(id.clone())
423            } else {
424                None
425            }
426        });
427
428        let tree_name = match d_tag {
429            Some(name) => name,
430            None => return,
431        };
432
433        // Extract hash and key from tags
434        let mut hash_hex: Option<String> = None;
435        let mut key_hex: Option<String> = None;
436
437        for tag in event.tags.iter() {
438            let tag_vec = tag.as_slice();
439            if tag_vec.len() >= 2 {
440                match tag_vec[0].as_str() {
441                    "hash" => hash_hex = Some(tag_vec[1].clone()),
442                    "key" => key_hex = Some(tag_vec[1].clone()),
443                    _ => {}
444                }
445            }
446        }
447
448        let hash = match hash_hex.and_then(|h| from_hex(&h).ok()) {
449            Some(h) => h,
450            None => return,
451        };
452
453        let key = key_hex.and_then(|k| {
454            let bytes = hex::decode(&k).ok()?;
455            if bytes.len() == 32 {
456                let mut arr = [0u8; 32];
457                arr.copy_from_slice(&bytes);
458                Some(arr)
459            } else {
460                None
461            }
462        });
463
464        let cid = Cid { hash, key };
465
466        // Build key
467        let npub = event
468            .pubkey
469            .to_bech32()
470            .unwrap_or_else(|_| event.pubkey.to_hex());
471        let key = format!("{}/{}", npub, tree_name);
472
473        // Determine priority
474        let priority = if event.pubkey == self.my_pubkey {
475            SyncPriority::Own
476        } else {
477            SyncPriority::Followed
478        };
479
480        // Check if we need to sync
481        let should_sync = {
482            let mut subs = subscriptions.write().await;
483            let sub = subs.entry(key.clone()).or_insert(TreeSubscription {
484                key: key.clone(),
485                current_cid: None,
486                priority,
487                last_synced: None,
488            });
489
490            // Check if CID changed
491            let changed = sub.current_cid.as_ref().map(|c| c.hash) != Some(cid.hash);
492            if changed {
493                sub.current_cid = Some(cid.clone());
494                true
495            } else {
496                false
497            }
498        };
499
500        if should_sync {
501            info!(
502                "New tree update: {} -> {}",
503                key,
504                to_hex(&cid.hash)[..12].to_string()
505            );
506
507            // Add to sync queue
508            let task = SyncTask {
509                key,
510                cid,
511                priority,
512                queued_at: Instant::now(),
513            };
514
515            let mut q = queue.write().await;
516
517            // Insert based on priority (own trees first)
518            let insert_pos = q
519                .iter()
520                .position(|t| t.priority > task.priority)
521                .unwrap_or(q.len());
522            q.insert(insert_pos, task);
523        }
524    }
525
526    /// Signal shutdown
527    pub fn shutdown(&self) {
528        let _ = self.shutdown_tx.send(true);
529    }
530
531    /// Queue a manual sync for a specific tree
532    pub async fn queue_sync(&self, key: &str, cid: Cid, priority: SyncPriority) {
533        let task = SyncTask {
534            key: key.to_string(),
535            cid,
536            priority,
537            queued_at: Instant::now(),
538        };
539
540        let mut q = self.queue.write().await;
541        let insert_pos = q
542            .iter()
543            .position(|t| t.priority > task.priority)
544            .unwrap_or(q.len());
545        q.insert(insert_pos, task);
546    }
547
548    /// Get current sync status
549    pub async fn status(&self) -> SyncStatus {
550        let subscriptions = self.subscriptions.read().await;
551        let queue = self.queue.read().await;
552        let syncing = self.syncing.read().await;
553
554        SyncStatus {
555            subscribed_trees: subscriptions.len(),
556            queued_tasks: queue.len(),
557            active_syncs: syncing.len(),
558        }
559    }
560}
561
562/// Overall sync status
563#[derive(Debug, Clone)]
564pub struct SyncStatus {
565    pub subscribed_trees: usize,
566    pub queued_tasks: usize,
567    pub active_syncs: usize,
568}