Skip to main content

hashtree_cli/
sync.rs

1//! Background sync service for auto-pulling trees from Nostr
2//!
3//! Subscribes to:
4//! 1. Own trees (all visibility levels) - highest priority
5//! 2. Followed users' public trees - lower priority
6//!
7//! Uses WebRTC peers first, falls back to Blossom HTTP servers
8
9use anyhow::Result;
10use git_remote_htree::nostr_client::load_keys;
11use hashtree_core::{from_hex, to_hex, Cid};
12use nostr_sdk::prelude::*;
13use std::collections::{HashMap, HashSet, VecDeque};
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::RwLock;
18use tracing::{error, info, warn};
19
20use crate::fetch::{FetchConfig, Fetcher};
21use crate::storage::{HashtreeStore, PRIORITY_FOLLOWED, PRIORITY_OWN};
22use crate::webrtc::WebRTCState;
23
24/// Sync priority levels
25#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
26pub enum SyncPriority {
27    /// Explicitly pinned mutable refs - highest priority
28    Pinned = 0,
29    /// Explicitly tracked authors - mirrors all readable trees for that author
30    TrackedAuthor = 1,
31    /// Own trees - high priority
32    Own = 2,
33    /// Followed users' trees - lower priority
34    Followed = 3,
35}
36
37/// A tree to sync
38#[derive(Debug, Clone)]
39pub struct SyncTask {
40    /// Nostr key (npub.../treename)
41    pub key: String,
42    /// Content identifier
43    pub cid: Cid,
44    /// Priority level
45    pub priority: SyncPriority,
46    /// When this task was queued
47    pub queued_at: Instant,
48}
49
50/// Configuration for background sync
51#[derive(Debug, Clone)]
52pub struct SyncConfig {
53    /// Enable syncing own trees
54    pub sync_own: bool,
55    /// Enable syncing followed users' public trees
56    pub sync_followed: bool,
57    /// Nostr relays for subscriptions
58    pub relays: Vec<String>,
59    /// Max concurrent sync tasks
60    pub max_concurrent: usize,
61    /// Timeout for WebRTC requests (ms)
62    pub webrtc_timeout_ms: u64,
63    /// Timeout for Blossom requests (ms)
64    pub blossom_timeout_ms: u64,
65}
66
67impl Default for SyncConfig {
68    fn default() -> Self {
69        Self {
70            sync_own: true,
71            sync_followed: true,
72            relays: hashtree_config::DEFAULT_RELAYS
73                .iter()
74                .map(|s| s.to_string())
75                .collect(),
76            max_concurrent: 3,
77            webrtc_timeout_ms: 2000,
78            blossom_timeout_ms: 10000,
79        }
80    }
81}
82
83impl SyncConfig {
84    /// Create from hashtree_config (respects user's config.toml)
85    pub fn from_config(config: &hashtree_config::Config) -> Self {
86        Self {
87            sync_own: true,
88            sync_followed: true,
89            relays: config.nostr.relays.clone(),
90            max_concurrent: 3,
91            webrtc_timeout_ms: 2000,
92            blossom_timeout_ms: 10000,
93        }
94    }
95}
96
97/// State for a subscribed tree
98#[allow(dead_code)]
99struct TreeSubscription {
100    key: String,
101    current_cid: Option<Cid>,
102    priority: SyncPriority,
103    last_synced: Option<Instant>,
104}
105
106fn build_exact_tree_filter(key: &str) -> Result<Filter> {
107    let (npub, tree_name) = key
108        .split_once('/')
109        .ok_or_else(|| anyhow::anyhow!("Invalid pinned ref key: {}", key))?;
110    let author = PublicKey::from_bech32(npub)
111        .map_err(|_| anyhow::anyhow!("Invalid npub in pinned ref key: {}", key))?;
112
113    Ok(Filter::new()
114        .kind(Kind::Custom(30078))
115        .author(author)
116        .custom_tag(
117            SingleLetterTag::lowercase(Alphabet::D),
118            vec![tree_name.to_string()],
119        )
120        .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]))
121}
122
123fn build_author_tree_filter(author: PublicKey) -> Filter {
124    Filter::new()
125        .kind(Kind::Custom(30078))
126        .author(author)
127        .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"])
128}
129
130fn load_author_signing_keys() -> HashMap<String, Keys> {
131    load_keys()
132        .into_iter()
133        .filter_map(|stored| {
134            let secret_hex = stored.secret_hex?;
135            let secret_bytes = hex::decode(&secret_hex).ok()?;
136            let secret = SecretKey::from_slice(&secret_bytes).ok()?;
137            Some((stored.pubkey_hex, Keys::new(secret)))
138        })
139        .collect()
140}
141
142fn cid_from_tree_event(event: &Event, author_keys: Option<&Keys>) -> Option<Cid> {
143    let mut hash_hex: Option<String> = None;
144    let mut key_hex: Option<String> = None;
145    let mut encrypted_key: Option<String> = None;
146    let mut self_encrypted_key: Option<String> = None;
147
148    for tag in event.tags.iter() {
149        let tag_vec = tag.as_slice();
150        if tag_vec.len() < 2 {
151            continue;
152        }
153
154        match tag_vec[0].as_str() {
155            "hash" => hash_hex = Some(tag_vec[1].clone()),
156            "key" => key_hex = Some(tag_vec[1].clone()),
157            "encryptedKey" => encrypted_key = Some(tag_vec[1].clone()),
158            "selfEncryptedKey" => self_encrypted_key = Some(tag_vec[1].clone()),
159            _ => {}
160        }
161    }
162
163    let hash = from_hex(&hash_hex?).ok()?;
164
165    if let Some(key_hex) = key_hex {
166        let bytes = hex::decode(&key_hex).ok()?;
167        if bytes.len() != 32 {
168            return None;
169        }
170        let mut key = [0u8; 32];
171        key.copy_from_slice(&bytes);
172        return Some(Cid {
173            hash,
174            key: Some(key),
175        });
176    }
177
178    if let Some(ciphertext) = self_encrypted_key {
179        let keys = author_keys?;
180        if keys.public_key() != event.pubkey {
181            return None;
182        }
183        let key_hex = nip44::decrypt(keys.secret_key(), &event.pubkey, &ciphertext).ok()?;
184        let bytes = hex::decode(&key_hex).ok()?;
185        if bytes.len() != 32 {
186            return None;
187        }
188        let mut key = [0u8; 32];
189        key.copy_from_slice(&bytes);
190        return Some(Cid {
191            hash,
192            key: Some(key),
193        });
194    }
195
196    if encrypted_key.is_some() {
197        return None;
198    }
199
200    Some(Cid { hash, key: None })
201}
202
203fn classify_sync_event(
204    key: &str,
205    author_hex: &str,
206    my_pubkey: &PublicKey,
207    pinned_refs: &HashSet<String>,
208    tracked_authors: &HashSet<String>,
209    followed_authors: &HashSet<String>,
210) -> Option<SyncPriority> {
211    if pinned_refs.contains(key) {
212        return Some(SyncPriority::Pinned);
213    }
214
215    if tracked_authors.contains(author_hex) {
216        return Some(SyncPriority::TrackedAuthor);
217    }
218
219    if author_hex == my_pubkey.to_hex() {
220        return Some(SyncPriority::Own);
221    }
222
223    if followed_authors.contains(author_hex) {
224        return Some(SyncPriority::Followed);
225    }
226
227    None
228}
229
230fn apply_synced_tree_update(store: &HashtreeStore, task: &SyncTask) -> Result<()> {
231    let (owner, name) = task
232        .key
233        .split_once('/')
234        .map(|(o, n)| (o.to_string(), Some(n)))
235        .unwrap_or((task.key.clone(), None));
236
237    let storage_priority = match task.priority {
238        SyncPriority::Pinned | SyncPriority::TrackedAuthor | SyncPriority::Own => PRIORITY_OWN,
239        SyncPriority::Followed => PRIORITY_FOLLOWED,
240    };
241
242    if matches!(
243        task.priority,
244        SyncPriority::Pinned | SyncPriority::TrackedAuthor
245    ) {
246        store.pin(&task.cid.hash)?;
247    }
248
249    store.index_tree(
250        &task.cid.hash,
251        &owner,
252        name,
253        storage_priority,
254        Some(&task.key),
255    )?;
256
257    store.evict_if_needed()?;
258    Ok(())
259}
260
261/// Background sync service
262pub struct BackgroundSync {
263    config: SyncConfig,
264    store: Arc<HashtreeStore>,
265    webrtc_state: Option<Arc<WebRTCState>>,
266    /// Nostr client for subscriptions
267    client: Client,
268    /// Our public key
269    my_pubkey: PublicKey,
270    /// Subscribed trees
271    subscriptions: Arc<RwLock<HashMap<String, TreeSubscription>>>,
272    /// Followed authors that are allowed to generate sync tasks
273    followed_authors: Arc<RwLock<HashSet<String>>>,
274    /// Currently pinned mutable refs that should keep following updates
275    pinned_refs: Arc<RwLock<HashSet<String>>>,
276    /// Authors whose readable trees should be mirrored continuously
277    tracked_authors: Arc<RwLock<HashSet<String>>>,
278    /// Exact pinned refs already subscribed at the relay layer
279    subscribed_pinned_refs: Arc<RwLock<HashSet<String>>>,
280    /// Tracked authors already subscribed at the relay layer
281    subscribed_tracked_authors: Arc<RwLock<HashSet<String>>>,
282    /// Local signing keys available for decrypting owner-private roots
283    author_signing_keys: Arc<RwLock<HashMap<String, Keys>>>,
284    /// Sync queue
285    queue: Arc<RwLock<VecDeque<SyncTask>>>,
286    /// Currently syncing hashes
287    syncing: Arc<RwLock<HashSet<String>>>,
288    /// Shutdown signal
289    shutdown_tx: tokio::sync::watch::Sender<bool>,
290    shutdown_rx: tokio::sync::watch::Receiver<bool>,
291    /// Fetcher for remote content
292    fetcher: Arc<Fetcher>,
293}
294
295impl BackgroundSync {
296    /// Create a new background sync service
297    pub async fn new(
298        config: SyncConfig,
299        store: Arc<HashtreeStore>,
300        keys: Keys,
301        webrtc_state: Option<Arc<WebRTCState>>,
302    ) -> Result<Self> {
303        let my_pubkey = keys.public_key();
304        let client = Client::new(keys);
305
306        // Add relays
307        for relay in &config.relays {
308            if let Err(e) = client.add_relay(relay).await {
309                warn!("Failed to add relay {}: {}", relay, e);
310            }
311        }
312
313        // Connect to relays
314        client.connect().await;
315
316        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
317
318        // Create fetcher with config
319        // BlossomClient auto-loads servers from ~/.hashtree/config.toml
320        let fetch_config = FetchConfig {
321            webrtc_timeout: Duration::from_millis(config.webrtc_timeout_ms),
322            blossom_timeout: Duration::from_millis(config.blossom_timeout_ms),
323        };
324        let fetcher = Arc::new(Fetcher::new(fetch_config));
325
326        Ok(Self {
327            config,
328            store,
329            webrtc_state,
330            client,
331            my_pubkey,
332            subscriptions: Arc::new(RwLock::new(HashMap::new())),
333            followed_authors: Arc::new(RwLock::new(HashSet::new())),
334            pinned_refs: Arc::new(RwLock::new(HashSet::new())),
335            tracked_authors: Arc::new(RwLock::new(HashSet::new())),
336            subscribed_pinned_refs: Arc::new(RwLock::new(HashSet::new())),
337            subscribed_tracked_authors: Arc::new(RwLock::new(HashSet::new())),
338            author_signing_keys: Arc::new(RwLock::new(load_author_signing_keys())),
339            queue: Arc::new(RwLock::new(VecDeque::new())),
340            syncing: Arc::new(RwLock::new(HashSet::new())),
341            shutdown_tx,
342            shutdown_rx,
343            fetcher,
344        })
345    }
346
347    /// Start the background sync service
348    pub async fn run(&self, contacts_file: PathBuf) -> Result<()> {
349        info!("Starting background sync service");
350
351        // Wait for relays to connect before subscribing
352        tokio::time::sleep(Duration::from_secs(3)).await;
353
354        self.refresh_author_signing_keys().await;
355        self.refresh_pinned_ref_subscriptions().await?;
356        self.refresh_tracked_author_subscriptions().await?;
357
358        // Subscribe to own trees
359        if self.config.sync_own {
360            self.subscribe_own_trees().await?;
361        }
362
363        // Subscribe to followed users' trees
364        if self.config.sync_followed {
365            self.subscribe_followed_trees(&contacts_file).await?;
366        }
367
368        // Start sync worker
369        let queue = self.queue.clone();
370        let syncing = self.syncing.clone();
371        let store = self.store.clone();
372        let webrtc_state = self.webrtc_state.clone();
373        let fetcher = self.fetcher.clone();
374        let max_concurrent = self.config.max_concurrent;
375        let mut shutdown_rx = self.shutdown_rx.clone();
376
377        // Spawn sync worker task
378        tokio::spawn(async move {
379            let mut interval = tokio::time::interval(Duration::from_millis(500));
380
381            loop {
382                tokio::select! {
383                    _ = shutdown_rx.changed() => {
384                        if *shutdown_rx.borrow() {
385                            info!("Sync worker shutting down");
386                            break;
387                        }
388                    }
389                    _ = interval.tick() => {
390                        // Check if we can start more sync tasks
391                        let current_syncing = syncing.read().await.len();
392                        if current_syncing >= max_concurrent {
393                            continue;
394                        }
395
396                        // Get next task from queue
397                        let task = {
398                            let mut q = queue.write().await;
399                            q.pop_front()
400                        };
401
402                        if let Some(task) = task {
403                            let hash_hex = to_hex(&task.cid.hash);
404
405                            // Check if already syncing
406                            {
407                                let mut s = syncing.write().await;
408                                if s.contains(&hash_hex) {
409                                    continue;
410                                }
411                                s.insert(hash_hex.clone());
412                            }
413
414                            // Spawn sync task
415                            let syncing_clone = syncing.clone();
416                            let store_clone = store.clone();
417                            let webrtc_clone = webrtc_state.clone();
418                            let fetcher_clone = fetcher.clone();
419
420                            tokio::spawn(async move {
421                                let result = fetcher_clone.fetch_cid_tree(
422                                    &store_clone,
423                                    webrtc_clone.as_ref(),
424                                    &task.cid,
425                                ).await;
426
427                                match result {
428                                    Ok((chunks_fetched, bytes_fetched)) => {
429                                        if chunks_fetched > 0 {
430                                            info!(
431                                                "Synced tree {} ({} chunks, {} bytes)",
432                                                &hash_hex[..12],
433                                                chunks_fetched,
434                                                bytes_fetched
435                                            );
436                                        } else {
437                                            tracing::debug!(
438                                                "Tree {} already present locally; applying ref update",
439                                                &hash_hex[..12]
440                                            );
441                                        }
442
443                                        match store_clone.blob_exists(&task.cid.hash) {
444                                            Ok(true) => {}
445                                            Ok(false) => {
446                                                warn!(
447                                                    "Skipping ref update for {} because root {} is still missing locally",
448                                                    task.key,
449                                                    &hash_hex[..12]
450                                                );
451                                                syncing_clone.write().await.remove(&hash_hex);
452                                                return;
453                                            }
454                                            Err(err) => {
455                                                warn!(
456                                                    "Failed to verify synced root {} before indexing {}: {}",
457                                                    &hash_hex[..12],
458                                                    task.key,
459                                                    err
460                                                );
461                                                syncing_clone.write().await.remove(&hash_hex);
462                                                return;
463                                            }
464                                        }
465
466                                        if let Err(e) = apply_synced_tree_update(&store_clone, &task) {
467                                            warn!("Failed to apply synced tree {}: {}", &hash_hex[..12], e);
468                                        }
469                                    }
470                                    Err(e) => {
471                                        warn!("Failed to sync tree {}: {}", &hash_hex[..12], e);
472                                    }
473                                }
474
475                                // Remove from syncing set
476                                syncing_clone.write().await.remove(&hash_hex);
477                            });
478                        }
479                    }
480                }
481            }
482        });
483
484        // Handle Nostr notifications for tree updates
485        let mut notifications = self.client.notifications();
486        let subscriptions = self.subscriptions.clone();
487        let queue = self.queue.clone();
488        let mut pinned_refresh = tokio::time::interval(Duration::from_secs(5));
489        let mut shutdown_rx = self.shutdown_rx.clone();
490
491        loop {
492            tokio::select! {
493                _ = shutdown_rx.changed() => {
494                    if *shutdown_rx.borrow() {
495                        info!("Background sync shutting down");
496                        break;
497                    }
498                }
499                _ = pinned_refresh.tick() => {
500                    self.refresh_author_signing_keys().await;
501                    if let Err(err) = self.refresh_pinned_ref_subscriptions().await {
502                        warn!("Failed to refresh pinned ref subscriptions: {}", err);
503                    }
504                    if let Err(err) = self.refresh_tracked_author_subscriptions().await {
505                        warn!("Failed to refresh tracked author subscriptions: {}", err);
506                    }
507                }
508                notification = notifications.recv() => {
509                    match notification {
510                        Ok(RelayPoolNotification::Event { event, .. }) => {
511                            self.handle_tree_event(&event, &subscriptions, &queue).await;
512                        }
513                        Ok(_) => {}
514                        Err(e) => {
515                            error!("Notification error: {}", e);
516                            break;
517                        }
518                    }
519                }
520            }
521        }
522
523        Ok(())
524    }
525
526    async fn refresh_pinned_ref_subscriptions(&self) -> Result<()> {
527        let current_refs: HashSet<String> = self.store.list_pinned_refs()?.into_iter().collect();
528        {
529            let mut pinned_refs = self.pinned_refs.write().await;
530            *pinned_refs = current_refs.clone();
531        }
532
533        {
534            let mut subscriptions = self.subscriptions.write().await;
535            subscriptions.retain(|key, sub| {
536                sub.priority != SyncPriority::Pinned || current_refs.contains(key)
537            });
538        }
539
540        let new_refs: Vec<String> = {
541            let subscribed = self.subscribed_pinned_refs.read().await;
542            current_refs
543                .iter()
544                .filter(|key| !subscribed.contains(*key))
545                .cloned()
546                .collect()
547        };
548
549        for key in new_refs {
550            let filter = match build_exact_tree_filter(&key) {
551                Ok(filter) => filter,
552                Err(err) => {
553                    warn!("Ignoring invalid pinned ref {}: {}", key, err);
554                    continue;
555                }
556            };
557
558            match self.client.subscribe(vec![filter], None).await {
559                Ok(_) => {
560                    info!("Subscribed to pinned ref {}", key);
561                    self.subscribed_pinned_refs.write().await.insert(key);
562                }
563                Err(err) => {
564                    warn!(
565                        "Failed to subscribe to pinned ref (will retry on refresh): {}",
566                        err
567                    );
568                }
569            }
570        }
571
572        Ok(())
573    }
574
575    async fn refresh_author_signing_keys(&self) {
576        let mut author_signing_keys = self.author_signing_keys.write().await;
577        *author_signing_keys = load_author_signing_keys();
578    }
579
580    async fn refresh_tracked_author_subscriptions(&self) -> Result<()> {
581        let tracked_npubs = self.store.list_tracked_authors()?;
582        let parsed_authors: Vec<(String, PublicKey, String)> = tracked_npubs
583            .into_iter()
584            .filter_map(|npub| match PublicKey::from_bech32(&npub) {
585                Ok(pubkey) => Some((npub, pubkey, pubkey.to_hex())),
586                Err(err) => {
587                    warn!("Ignoring invalid tracked author {}: {}", npub, err);
588                    None
589                }
590            })
591            .collect();
592        let current_authors: HashSet<String> = parsed_authors
593            .iter()
594            .map(|(_, _, author_hex)| author_hex.clone())
595            .collect();
596
597        {
598            let mut tracked_authors = self.tracked_authors.write().await;
599            *tracked_authors = current_authors.clone();
600        }
601
602        {
603            let mut subscriptions = self.subscriptions.write().await;
604            subscriptions.retain(|key, sub| {
605                if sub.priority != SyncPriority::TrackedAuthor {
606                    return true;
607                }
608
609                let Some((npub, _)) = key.split_once('/') else {
610                    return false;
611                };
612                let Ok(author) = PublicKey::from_bech32(npub) else {
613                    return false;
614                };
615                current_authors.contains(&author.to_hex())
616            });
617        }
618
619        let new_authors: Vec<(String, PublicKey)> = {
620            let subscribed = self.subscribed_tracked_authors.read().await;
621            parsed_authors
622                .iter()
623                .filter(|(_, _, author_hex)| !subscribed.contains(author_hex))
624                .map(|(_, pubkey, author_hex)| (author_hex.clone(), *pubkey))
625                .collect()
626        };
627
628        for (author_hex, author) in new_authors {
629            match self
630                .client
631                .subscribe(vec![build_author_tree_filter(author)], None)
632                .await
633            {
634                Ok(_) => {
635                    info!(
636                        "Subscribed to tracked author {}",
637                        author.to_bech32().unwrap_or(author_hex.clone())
638                    );
639                    self.subscribed_tracked_authors
640                        .write()
641                        .await
642                        .insert(author_hex);
643                }
644                Err(err) => {
645                    warn!(
646                        "Failed to subscribe to tracked author (will retry on refresh): {}",
647                        err
648                    );
649                }
650            }
651        }
652
653        Ok(())
654    }
655
656    /// Subscribe to own trees (kind 30078 events from our pubkey)
657    async fn subscribe_own_trees(&self) -> Result<()> {
658        let filter = build_author_tree_filter(self.my_pubkey);
659
660        match self.client.subscribe(vec![filter], None).await {
661            Ok(_) => {
662                info!(
663                    "Subscribed to own trees for {}",
664                    self.my_pubkey.to_bech32().unwrap_or_default()
665                );
666            }
667            Err(e) => {
668                warn!(
669                    "Failed to subscribe to own trees (will retry on reconnect): {}",
670                    e
671                );
672            }
673        }
674
675        Ok(())
676    }
677
678    /// Subscribe to followed users' trees
679    async fn subscribe_followed_trees(&self, contacts_file: &PathBuf) -> Result<()> {
680        // Load contacts from file
681        let contacts: Vec<String> = if contacts_file.exists() {
682            let data = std::fs::read_to_string(contacts_file)?;
683            serde_json::from_str(&data).unwrap_or_default()
684        } else {
685            Vec::new()
686        };
687
688        if contacts.is_empty() {
689            self.followed_authors.write().await.clear();
690            info!("No contacts to subscribe to");
691            return Ok(());
692        }
693
694        {
695            let mut followed_authors = self.followed_authors.write().await;
696            *followed_authors = contacts.iter().cloned().collect();
697        }
698
699        // Convert hex pubkeys to PublicKey
700        let pubkeys: Vec<PublicKey> = contacts
701            .iter()
702            .filter_map(|hex| PublicKey::from_hex(hex).ok())
703            .collect();
704
705        if pubkeys.is_empty() {
706            return Ok(());
707        }
708
709        // Subscribe to all followed users' hashtree events
710        let filter = Filter::new()
711            .kind(Kind::Custom(30078))
712            .authors(pubkeys.clone())
713            .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
714
715        match self.client.subscribe(vec![filter], None).await {
716            Ok(_) => {
717                info!("Subscribed to {} followed users' trees", pubkeys.len());
718            }
719            Err(e) => {
720                warn!(
721                    "Failed to subscribe to followed trees (will retry on reconnect): {}",
722                    e
723                );
724            }
725        }
726
727        Ok(())
728    }
729
730    /// Handle incoming tree event
731    async fn handle_tree_event(
732        &self,
733        event: &Event,
734        subscriptions: &Arc<RwLock<HashMap<String, TreeSubscription>>>,
735        queue: &Arc<RwLock<VecDeque<SyncTask>>>,
736    ) {
737        // Check if it's a hashtree event
738        let has_hashtree_tag = event.tags.iter().any(|tag| {
739            let v = tag.as_slice();
740            v.len() >= 2 && v[0] == "l" && v[1] == "hashtree"
741        });
742
743        if !has_hashtree_tag || event.kind != Kind::Custom(30078) {
744            return;
745        }
746
747        // Extract d-tag (tree name)
748        let d_tag = event.tags.iter().find_map(|tag| {
749            if let Some(TagStandard::Identifier(id)) = tag.as_standardized() {
750                Some(id.clone())
751            } else {
752                None
753            }
754        });
755
756        let tree_name = match d_tag {
757            Some(name) => name,
758            None => return,
759        };
760
761        // Build key
762        let npub = event
763            .pubkey
764            .to_bech32()
765            .unwrap_or_else(|_| event.pubkey.to_hex());
766        let key = format!("{}/{}", npub, tree_name);
767
768        let author_hex = event.pubkey.to_hex();
769        let pinned_refs = self.pinned_refs.read().await.clone();
770        let tracked_authors = self.tracked_authors.read().await.clone();
771        let followed_authors = self.followed_authors.read().await.clone();
772
773        // Determine priority and ignore stale events from refs we no longer care about.
774        let Some(priority) = classify_sync_event(
775            &key,
776            &author_hex,
777            &self.my_pubkey,
778            &pinned_refs,
779            &tracked_authors,
780            &followed_authors,
781        ) else {
782            return;
783        };
784
785        let author_keys = self
786            .author_signing_keys
787            .read()
788            .await
789            .get(&author_hex)
790            .cloned();
791        let Some(cid) = cid_from_tree_event(event, author_keys.as_ref()) else {
792            return;
793        };
794
795        // Check if we need to sync
796        let should_sync = {
797            let mut subs = subscriptions.write().await;
798            let sub = subs.entry(key.clone()).or_insert(TreeSubscription {
799                key: key.clone(),
800                current_cid: None,
801                priority,
802                last_synced: None,
803            });
804
805            // Check if CID changed
806            let changed = sub.current_cid.as_ref().map(|c| c.hash) != Some(cid.hash);
807            if changed {
808                sub.current_cid = Some(cid.clone());
809                true
810            } else {
811                false
812            }
813        };
814
815        if should_sync {
816            info!(
817                "New tree update: {} -> {}",
818                key,
819                to_hex(&cid.hash)[..12].to_string()
820            );
821
822            // Add to sync queue
823            let task = SyncTask {
824                key,
825                cid,
826                priority,
827                queued_at: Instant::now(),
828            };
829
830            let mut q = queue.write().await;
831
832            // Insert based on priority (own trees first)
833            let insert_pos = q
834                .iter()
835                .position(|t| t.priority > task.priority)
836                .unwrap_or(q.len());
837            q.insert(insert_pos, task);
838        }
839    }
840
841    /// Signal shutdown
842    pub fn shutdown(&self) {
843        let _ = self.shutdown_tx.send(true);
844    }
845
846    /// Queue a manual sync for a specific tree
847    pub async fn queue_sync(&self, key: &str, cid: Cid, priority: SyncPriority) {
848        let task = SyncTask {
849            key: key.to_string(),
850            cid,
851            priority,
852            queued_at: Instant::now(),
853        };
854
855        let mut q = self.queue.write().await;
856        let insert_pos = q
857            .iter()
858            .position(|t| t.priority > task.priority)
859            .unwrap_or(q.len());
860        q.insert(insert_pos, task);
861    }
862
863    /// Get current sync status
864    pub async fn status(&self) -> SyncStatus {
865        let subscriptions = self.subscriptions.read().await;
866        let queue = self.queue.read().await;
867        let syncing = self.syncing.read().await;
868
869        SyncStatus {
870            subscribed_trees: subscriptions.len(),
871            queued_tasks: queue.len(),
872            active_syncs: syncing.len(),
873        }
874    }
875}
876
877/// Overall sync status
878#[derive(Debug, Clone)]
879pub struct SyncStatus {
880    pub subscribed_trees: usize,
881    pub queued_tasks: usize,
882    pub active_syncs: usize,
883}
884
885#[cfg(test)]
886mod tests {
887    use super::*;
888    use nostr_sdk::Keys;
889    use std::fs;
890    use tempfile::TempDir;
891
892    fn upload_repo_root(
893        store: &HashtreeStore,
894        base: &std::path::Path,
895        name: &str,
896        body: &str,
897    ) -> Cid {
898        let dir = base.join(name);
899        fs::create_dir_all(&dir).expect("create repo dir");
900        fs::write(dir.join("README.md"), body).expect("write repo file");
901        let cid = store
902            .upload_dir_with_options(&dir, true)
903            .expect("upload repo directory");
904        let cid = Cid::parse(&cid).expect("parse repo cid");
905        store.unpin(&cid.hash).expect("clear upload auto-pin");
906        cid
907    }
908
909    #[test]
910    fn classify_sync_event_ignores_removed_pinned_refs() {
911        let keys = Keys::generate();
912        let author = Keys::generate().public_key();
913        let key = format!("{}/repo", author.to_bech32().expect("author npub"));
914
915        let priority = classify_sync_event(
916            &key,
917            &author.to_hex(),
918            &keys.public_key(),
919            &HashSet::new(),
920            &HashSet::new(),
921            &HashSet::new(),
922        );
923
924        assert_eq!(priority, None);
925    }
926
927    #[test]
928    fn classify_sync_event_prioritizes_tracked_authors() {
929        let keys = Keys::generate();
930        let author = Keys::generate().public_key();
931        let key = format!("{}/repo", author.to_bech32().expect("author npub"));
932
933        let mut tracked_authors = HashSet::new();
934        tracked_authors.insert(author.to_hex());
935
936        let priority = classify_sync_event(
937            &key,
938            &author.to_hex(),
939            &keys.public_key(),
940            &HashSet::new(),
941            &tracked_authors,
942            &HashSet::new(),
943        );
944
945        assert_eq!(priority, Some(SyncPriority::TrackedAuthor));
946    }
947
948    #[test]
949    fn tracked_author_private_event_uses_matching_local_key() {
950        let author = Keys::generate();
951        let root_hash = [0x11; 32];
952        let root_key = [0x22; 32];
953        let ciphertext = nip44::encrypt(
954            author.secret_key(),
955            &author.public_key(),
956            hex::encode(root_key),
957            nip44::Version::V2,
958        )
959        .expect("encrypt private root key");
960        let event = EventBuilder::new(
961            Kind::Custom(30078),
962            "",
963            vec![
964                Tag::identifier("backup".to_string()),
965                Tag::custom(
966                    TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
967                    vec!["hashtree"],
968                ),
969                Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(root_hash)]),
970                Tag::custom(TagKind::Custom("selfEncryptedKey".into()), vec![ciphertext]),
971            ],
972        )
973        .to_event(&author)
974        .expect("sign private root event");
975
976        let cid = cid_from_tree_event(&event, Some(&author)).expect("decrypt tracked private cid");
977
978        assert_eq!(cid.hash, root_hash);
979        assert_eq!(cid.key, Some(root_key));
980    }
981
982    #[test]
983    fn pinned_sync_update_replaces_old_root_pin() {
984        let temp_dir = TempDir::new().expect("temp dir");
985        let store = HashtreeStore::new(temp_dir.path().join("store")).expect("store");
986        let first_cid = upload_repo_root(&store, temp_dir.path(), "repo-v1", "version one\n");
987        let second_cid = upload_repo_root(&store, temp_dir.path(), "repo-v2", "version two\n");
988        let repo_key = format!(
989            "{}/repo",
990            Keys::generate()
991                .public_key()
992                .to_bech32()
993                .expect("repo owner npub")
994        );
995
996        let first_task = SyncTask {
997            key: repo_key.clone(),
998            cid: first_cid.clone(),
999            priority: SyncPriority::Pinned,
1000            queued_at: Instant::now(),
1001        };
1002        apply_synced_tree_update(&store, &first_task).expect("apply first sync update");
1003
1004        assert!(store.is_pinned(&first_cid.hash).expect("first root pinned"));
1005        assert_eq!(
1006            store.get_tree_ref(&repo_key).expect("first tree ref"),
1007            Some(first_cid.hash)
1008        );
1009
1010        let second_task = SyncTask {
1011            key: repo_key.clone(),
1012            cid: second_cid.clone(),
1013            priority: SyncPriority::Pinned,
1014            queued_at: Instant::now(),
1015        };
1016        apply_synced_tree_update(&store, &second_task).expect("apply second sync update");
1017
1018        assert!(
1019            !store
1020                .is_pinned(&first_cid.hash)
1021                .expect("first root pin status"),
1022            "updating a pinned ref should unpin the superseded root"
1023        );
1024        assert!(store
1025            .is_pinned(&second_cid.hash)
1026            .expect("second root pinned"));
1027        assert_eq!(
1028            store.get_tree_ref(&repo_key).expect("updated tree ref"),
1029            Some(second_cid.hash)
1030        );
1031        assert!(
1032            store
1033                .get_tree_meta(&first_cid.hash)
1034                .expect("first meta lookup")
1035                .is_none(),
1036            "superseded pinned root should be unindexed after update"
1037        );
1038    }
1039
1040    #[test]
1041    fn tracked_author_sync_update_replaces_old_root_pin() {
1042        let temp_dir = TempDir::new().expect("temp dir");
1043        let store = HashtreeStore::new(temp_dir.path().join("store")).expect("store");
1044        let first_cid = upload_repo_root(&store, temp_dir.path(), "repo-v1", "version one\n");
1045        let second_cid = upload_repo_root(&store, temp_dir.path(), "repo-v2", "version two\n");
1046        let repo_key = format!(
1047            "{}/repo",
1048            Keys::generate()
1049                .public_key()
1050                .to_bech32()
1051                .expect("repo owner npub")
1052        );
1053
1054        let first_task = SyncTask {
1055            key: repo_key.clone(),
1056            cid: first_cid.clone(),
1057            priority: SyncPriority::TrackedAuthor,
1058            queued_at: Instant::now(),
1059        };
1060        apply_synced_tree_update(&store, &first_task).expect("apply first tracked sync update");
1061
1062        assert!(store.is_pinned(&first_cid.hash).expect("first root pinned"));
1063        assert_eq!(
1064            store.get_tree_ref(&repo_key).expect("first tree ref"),
1065            Some(first_cid.hash)
1066        );
1067
1068        let second_task = SyncTask {
1069            key: repo_key.clone(),
1070            cid: second_cid.clone(),
1071            priority: SyncPriority::TrackedAuthor,
1072            queued_at: Instant::now(),
1073        };
1074        apply_synced_tree_update(&store, &second_task).expect("apply second tracked sync update");
1075
1076        assert!(
1077            !store
1078                .is_pinned(&first_cid.hash)
1079                .expect("first root pin status"),
1080            "updating a tracked author ref should unpin the superseded root"
1081        );
1082        assert!(store
1083            .is_pinned(&second_cid.hash)
1084            .expect("second root pinned"));
1085        assert_eq!(
1086            store.get_tree_ref(&repo_key).expect("updated tree ref"),
1087            Some(second_cid.hash)
1088        );
1089    }
1090}