1use anyhow::Result;
10use hashtree_core::{from_hex, to_hex, Cid};
11use nostr_sdk::prelude::*;
12use std::collections::{HashMap, HashSet, VecDeque};
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::RwLock;
17use tracing::{error, info, warn};
18
19use crate::fetch::{FetchConfig, Fetcher};
20use crate::storage::{HashtreeStore, PRIORITY_FOLLOWED, PRIORITY_OWN};
21use crate::webrtc::WebRTCState;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
25pub enum SyncPriority {
26 Pinned = 0,
28 Own = 1,
30 Followed = 2,
32}
33
34#[derive(Debug, Clone)]
36pub struct SyncTask {
37 pub key: String,
39 pub cid: Cid,
41 pub priority: SyncPriority,
43 pub queued_at: Instant,
45}
46
47#[derive(Debug, Clone)]
49pub struct SyncConfig {
50 pub sync_own: bool,
52 pub sync_followed: bool,
54 pub relays: Vec<String>,
56 pub max_concurrent: usize,
58 pub webrtc_timeout_ms: u64,
60 pub blossom_timeout_ms: u64,
62}
63
64impl Default for SyncConfig {
65 fn default() -> Self {
66 Self {
67 sync_own: true,
68 sync_followed: true,
69 relays: hashtree_config::DEFAULT_RELAYS
70 .iter()
71 .map(|s| s.to_string())
72 .collect(),
73 max_concurrent: 3,
74 webrtc_timeout_ms: 2000,
75 blossom_timeout_ms: 10000,
76 }
77 }
78}
79
80impl SyncConfig {
81 pub fn from_config(config: &hashtree_config::Config) -> Self {
83 Self {
84 sync_own: true,
85 sync_followed: true,
86 relays: config.nostr.relays.clone(),
87 max_concurrent: 3,
88 webrtc_timeout_ms: 2000,
89 blossom_timeout_ms: 10000,
90 }
91 }
92}
93
94#[allow(dead_code)]
96struct TreeSubscription {
97 key: String,
98 current_cid: Option<Cid>,
99 priority: SyncPriority,
100 last_synced: Option<Instant>,
101}
102
103fn build_exact_tree_filter(key: &str) -> Result<Filter> {
104 let (npub, tree_name) = key
105 .split_once('/')
106 .ok_or_else(|| anyhow::anyhow!("Invalid pinned ref key: {}", key))?;
107 let author = PublicKey::from_bech32(npub)
108 .map_err(|_| anyhow::anyhow!("Invalid npub in pinned ref key: {}", key))?;
109
110 Ok(Filter::new()
111 .kind(Kind::Custom(30078))
112 .author(author)
113 .custom_tag(
114 SingleLetterTag::lowercase(Alphabet::D),
115 vec![tree_name.to_string()],
116 )
117 .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]))
118}
119
120fn classify_sync_event(
121 key: &str,
122 author_hex: &str,
123 my_pubkey: &PublicKey,
124 pinned_refs: &HashSet<String>,
125 followed_authors: &HashSet<String>,
126) -> Option<SyncPriority> {
127 if pinned_refs.contains(key) {
128 return Some(SyncPriority::Pinned);
129 }
130
131 if author_hex == my_pubkey.to_hex() {
132 return Some(SyncPriority::Own);
133 }
134
135 if followed_authors.contains(author_hex) {
136 return Some(SyncPriority::Followed);
137 }
138
139 None
140}
141
142fn apply_synced_tree_update(store: &HashtreeStore, task: &SyncTask) -> Result<()> {
143 let (owner, name) = task
144 .key
145 .split_once('/')
146 .map(|(o, n)| (o.to_string(), Some(n)))
147 .unwrap_or((task.key.clone(), None));
148
149 let storage_priority = match task.priority {
150 SyncPriority::Pinned | SyncPriority::Own => PRIORITY_OWN,
151 SyncPriority::Followed => PRIORITY_FOLLOWED,
152 };
153
154 if task.priority == SyncPriority::Pinned {
155 store.pin(&task.cid.hash)?;
156 }
157
158 store.index_tree(
159 &task.cid.hash,
160 &owner,
161 name,
162 storage_priority,
163 Some(&task.key),
164 )?;
165
166 store.evict_if_needed()?;
167 Ok(())
168}
169
170pub struct BackgroundSync {
172 config: SyncConfig,
173 store: Arc<HashtreeStore>,
174 webrtc_state: Option<Arc<WebRTCState>>,
175 client: Client,
177 my_pubkey: PublicKey,
179 subscriptions: Arc<RwLock<HashMap<String, TreeSubscription>>>,
181 followed_authors: Arc<RwLock<HashSet<String>>>,
183 pinned_refs: Arc<RwLock<HashSet<String>>>,
185 subscribed_pinned_refs: Arc<RwLock<HashSet<String>>>,
187 queue: Arc<RwLock<VecDeque<SyncTask>>>,
189 syncing: Arc<RwLock<HashSet<String>>>,
191 shutdown_tx: tokio::sync::watch::Sender<bool>,
193 shutdown_rx: tokio::sync::watch::Receiver<bool>,
194 fetcher: Arc<Fetcher>,
196}
197
198impl BackgroundSync {
199 pub async fn new(
201 config: SyncConfig,
202 store: Arc<HashtreeStore>,
203 keys: Keys,
204 webrtc_state: Option<Arc<WebRTCState>>,
205 ) -> Result<Self> {
206 let my_pubkey = keys.public_key();
207 let client = Client::new(keys);
208
209 for relay in &config.relays {
211 if let Err(e) = client.add_relay(relay).await {
212 warn!("Failed to add relay {}: {}", relay, e);
213 }
214 }
215
216 client.connect().await;
218
219 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
220
221 let fetch_config = FetchConfig {
224 webrtc_timeout: Duration::from_millis(config.webrtc_timeout_ms),
225 blossom_timeout: Duration::from_millis(config.blossom_timeout_ms),
226 };
227 let fetcher = Arc::new(Fetcher::new(fetch_config));
228
229 Ok(Self {
230 config,
231 store,
232 webrtc_state,
233 client,
234 my_pubkey,
235 subscriptions: Arc::new(RwLock::new(HashMap::new())),
236 followed_authors: Arc::new(RwLock::new(HashSet::new())),
237 pinned_refs: Arc::new(RwLock::new(HashSet::new())),
238 subscribed_pinned_refs: Arc::new(RwLock::new(HashSet::new())),
239 queue: Arc::new(RwLock::new(VecDeque::new())),
240 syncing: Arc::new(RwLock::new(HashSet::new())),
241 shutdown_tx,
242 shutdown_rx,
243 fetcher,
244 })
245 }
246
247 pub async fn run(&self, contacts_file: PathBuf) -> Result<()> {
249 info!("Starting background sync service");
250
251 tokio::time::sleep(Duration::from_secs(3)).await;
253
254 self.refresh_pinned_ref_subscriptions().await?;
255
256 if self.config.sync_own {
258 self.subscribe_own_trees().await?;
259 }
260
261 if self.config.sync_followed {
263 self.subscribe_followed_trees(&contacts_file).await?;
264 }
265
266 let queue = self.queue.clone();
268 let syncing = self.syncing.clone();
269 let store = self.store.clone();
270 let webrtc_state = self.webrtc_state.clone();
271 let fetcher = self.fetcher.clone();
272 let max_concurrent = self.config.max_concurrent;
273 let mut shutdown_rx = self.shutdown_rx.clone();
274
275 tokio::spawn(async move {
277 let mut interval = tokio::time::interval(Duration::from_millis(500));
278
279 loop {
280 tokio::select! {
281 _ = shutdown_rx.changed() => {
282 if *shutdown_rx.borrow() {
283 info!("Sync worker shutting down");
284 break;
285 }
286 }
287 _ = interval.tick() => {
288 let current_syncing = syncing.read().await.len();
290 if current_syncing >= max_concurrent {
291 continue;
292 }
293
294 let task = {
296 let mut q = queue.write().await;
297 q.pop_front()
298 };
299
300 if let Some(task) = task {
301 let hash_hex = to_hex(&task.cid.hash);
302
303 {
305 let mut s = syncing.write().await;
306 if s.contains(&hash_hex) {
307 continue;
308 }
309 s.insert(hash_hex.clone());
310 }
311
312 let syncing_clone = syncing.clone();
314 let store_clone = store.clone();
315 let webrtc_clone = webrtc_state.clone();
316 let fetcher_clone = fetcher.clone();
317
318 tokio::spawn(async move {
319 let result = fetcher_clone.fetch_tree(
320 &store_clone,
321 webrtc_clone.as_ref(),
322 &task.cid.hash,
323 ).await;
324
325 match result {
326 Ok((chunks_fetched, bytes_fetched)) => {
327 if chunks_fetched > 0 {
328 info!(
329 "Synced tree {} ({} chunks, {} bytes)",
330 &hash_hex[..12],
331 chunks_fetched,
332 bytes_fetched
333 );
334 } else {
335 tracing::debug!(
336 "Tree {} already present locally; applying ref update",
337 &hash_hex[..12]
338 );
339 }
340
341 if let Err(e) = apply_synced_tree_update(&store_clone, &task) {
342 warn!("Failed to apply synced tree {}: {}", &hash_hex[..12], e);
343 }
344 }
345 Err(e) => {
346 warn!("Failed to sync tree {}: {}", &hash_hex[..12], e);
347 }
348 }
349
350 syncing_clone.write().await.remove(&hash_hex);
352 });
353 }
354 }
355 }
356 }
357 });
358
359 let mut notifications = self.client.notifications();
361 let subscriptions = self.subscriptions.clone();
362 let queue = self.queue.clone();
363 let mut pinned_refresh = tokio::time::interval(Duration::from_secs(5));
364 let mut shutdown_rx = self.shutdown_rx.clone();
365
366 loop {
367 tokio::select! {
368 _ = shutdown_rx.changed() => {
369 if *shutdown_rx.borrow() {
370 info!("Background sync shutting down");
371 break;
372 }
373 }
374 _ = pinned_refresh.tick() => {
375 if let Err(err) = self.refresh_pinned_ref_subscriptions().await {
376 warn!("Failed to refresh pinned ref subscriptions: {}", err);
377 }
378 }
379 notification = notifications.recv() => {
380 match notification {
381 Ok(RelayPoolNotification::Event { event, .. }) => {
382 self.handle_tree_event(&event, &subscriptions, &queue).await;
383 }
384 Ok(_) => {}
385 Err(e) => {
386 error!("Notification error: {}", e);
387 break;
388 }
389 }
390 }
391 }
392 }
393
394 Ok(())
395 }
396
397 async fn refresh_pinned_ref_subscriptions(&self) -> Result<()> {
398 let current_refs: HashSet<String> = self.store.list_pinned_refs()?.into_iter().collect();
399 {
400 let mut pinned_refs = self.pinned_refs.write().await;
401 *pinned_refs = current_refs.clone();
402 }
403
404 {
405 let mut subscriptions = self.subscriptions.write().await;
406 subscriptions.retain(|key, sub| {
407 sub.priority != SyncPriority::Pinned || current_refs.contains(key)
408 });
409 }
410
411 let new_refs: Vec<String> = {
412 let subscribed = self.subscribed_pinned_refs.read().await;
413 current_refs
414 .iter()
415 .filter(|key| !subscribed.contains(*key))
416 .cloned()
417 .collect()
418 };
419
420 for key in new_refs {
421 let filter = match build_exact_tree_filter(&key) {
422 Ok(filter) => filter,
423 Err(err) => {
424 warn!("Ignoring invalid pinned ref {}: {}", key, err);
425 continue;
426 }
427 };
428
429 match self.client.subscribe(vec![filter], None).await {
430 Ok(_) => {
431 info!("Subscribed to pinned ref {}", key);
432 self.subscribed_pinned_refs.write().await.insert(key);
433 }
434 Err(err) => {
435 warn!(
436 "Failed to subscribe to pinned ref (will retry on refresh): {}",
437 err
438 );
439 }
440 }
441 }
442
443 Ok(())
444 }
445
446 async fn subscribe_own_trees(&self) -> Result<()> {
448 let filter = Filter::new()
449 .kind(Kind::Custom(30078))
450 .author(self.my_pubkey)
451 .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
452
453 match self.client.subscribe(vec![filter], None).await {
454 Ok(_) => {
455 info!(
456 "Subscribed to own trees for {}",
457 self.my_pubkey.to_bech32().unwrap_or_default()
458 );
459 }
460 Err(e) => {
461 warn!(
462 "Failed to subscribe to own trees (will retry on reconnect): {}",
463 e
464 );
465 }
466 }
467
468 Ok(())
469 }
470
471 async fn subscribe_followed_trees(&self, contacts_file: &PathBuf) -> Result<()> {
473 let contacts: Vec<String> = if contacts_file.exists() {
475 let data = std::fs::read_to_string(contacts_file)?;
476 serde_json::from_str(&data).unwrap_or_default()
477 } else {
478 Vec::new()
479 };
480
481 if contacts.is_empty() {
482 self.followed_authors.write().await.clear();
483 info!("No contacts to subscribe to");
484 return Ok(());
485 }
486
487 {
488 let mut followed_authors = self.followed_authors.write().await;
489 *followed_authors = contacts.iter().cloned().collect();
490 }
491
492 let pubkeys: Vec<PublicKey> = contacts
494 .iter()
495 .filter_map(|hex| PublicKey::from_hex(hex).ok())
496 .collect();
497
498 if pubkeys.is_empty() {
499 return Ok(());
500 }
501
502 let filter = Filter::new()
504 .kind(Kind::Custom(30078))
505 .authors(pubkeys.clone())
506 .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
507
508 match self.client.subscribe(vec![filter], None).await {
509 Ok(_) => {
510 info!("Subscribed to {} followed users' trees", pubkeys.len());
511 }
512 Err(e) => {
513 warn!(
514 "Failed to subscribe to followed trees (will retry on reconnect): {}",
515 e
516 );
517 }
518 }
519
520 Ok(())
521 }
522
523 async fn handle_tree_event(
525 &self,
526 event: &Event,
527 subscriptions: &Arc<RwLock<HashMap<String, TreeSubscription>>>,
528 queue: &Arc<RwLock<VecDeque<SyncTask>>>,
529 ) {
530 let has_hashtree_tag = event.tags.iter().any(|tag| {
532 let v = tag.as_slice();
533 v.len() >= 2 && v[0] == "l" && v[1] == "hashtree"
534 });
535
536 if !has_hashtree_tag || event.kind != Kind::Custom(30078) {
537 return;
538 }
539
540 let d_tag = event.tags.iter().find_map(|tag| {
542 if let Some(TagStandard::Identifier(id)) = tag.as_standardized() {
543 Some(id.clone())
544 } else {
545 None
546 }
547 });
548
549 let tree_name = match d_tag {
550 Some(name) => name,
551 None => return,
552 };
553
554 let mut hash_hex: Option<String> = None;
556 let mut key_hex: Option<String> = None;
557
558 for tag in event.tags.iter() {
559 let tag_vec = tag.as_slice();
560 if tag_vec.len() >= 2 {
561 match tag_vec[0].as_str() {
562 "hash" => hash_hex = Some(tag_vec[1].clone()),
563 "key" => key_hex = Some(tag_vec[1].clone()),
564 _ => {}
565 }
566 }
567 }
568
569 let hash = match hash_hex.and_then(|h| from_hex(&h).ok()) {
570 Some(h) => h,
571 None => return,
572 };
573
574 let key = key_hex.and_then(|k| {
575 let bytes = hex::decode(&k).ok()?;
576 if bytes.len() == 32 {
577 let mut arr = [0u8; 32];
578 arr.copy_from_slice(&bytes);
579 Some(arr)
580 } else {
581 None
582 }
583 });
584
585 let cid = Cid { hash, key };
586
587 let npub = event
589 .pubkey
590 .to_bech32()
591 .unwrap_or_else(|_| event.pubkey.to_hex());
592 let key = format!("{}/{}", npub, tree_name);
593
594 let author_hex = event.pubkey.to_hex();
595 let pinned_refs = self.pinned_refs.read().await.clone();
596 let followed_authors = self.followed_authors.read().await.clone();
597
598 let Some(priority) = classify_sync_event(
600 &key,
601 &author_hex,
602 &self.my_pubkey,
603 &pinned_refs,
604 &followed_authors,
605 ) else {
606 return;
607 };
608
609 let should_sync = {
611 let mut subs = subscriptions.write().await;
612 let sub = subs.entry(key.clone()).or_insert(TreeSubscription {
613 key: key.clone(),
614 current_cid: None,
615 priority,
616 last_synced: None,
617 });
618
619 let changed = sub.current_cid.as_ref().map(|c| c.hash) != Some(cid.hash);
621 if changed {
622 sub.current_cid = Some(cid.clone());
623 true
624 } else {
625 false
626 }
627 };
628
629 if should_sync {
630 info!(
631 "New tree update: {} -> {}",
632 key,
633 to_hex(&cid.hash)[..12].to_string()
634 );
635
636 let task = SyncTask {
638 key,
639 cid,
640 priority,
641 queued_at: Instant::now(),
642 };
643
644 let mut q = queue.write().await;
645
646 let insert_pos = q
648 .iter()
649 .position(|t| t.priority > task.priority)
650 .unwrap_or(q.len());
651 q.insert(insert_pos, task);
652 }
653 }
654
655 pub fn shutdown(&self) {
657 let _ = self.shutdown_tx.send(true);
658 }
659
660 pub async fn queue_sync(&self, key: &str, cid: Cid, priority: SyncPriority) {
662 let task = SyncTask {
663 key: key.to_string(),
664 cid,
665 priority,
666 queued_at: Instant::now(),
667 };
668
669 let mut q = self.queue.write().await;
670 let insert_pos = q
671 .iter()
672 .position(|t| t.priority > task.priority)
673 .unwrap_or(q.len());
674 q.insert(insert_pos, task);
675 }
676
677 pub async fn status(&self) -> SyncStatus {
679 let subscriptions = self.subscriptions.read().await;
680 let queue = self.queue.read().await;
681 let syncing = self.syncing.read().await;
682
683 SyncStatus {
684 subscribed_trees: subscriptions.len(),
685 queued_tasks: queue.len(),
686 active_syncs: syncing.len(),
687 }
688 }
689}
690
691#[derive(Debug, Clone)]
693pub struct SyncStatus {
694 pub subscribed_trees: usize,
695 pub queued_tasks: usize,
696 pub active_syncs: usize,
697}
698
699#[cfg(test)]
700mod tests {
701 use super::*;
702 use nostr_sdk::Keys;
703 use std::fs;
704 use tempfile::TempDir;
705
706 fn upload_repo_root(
707 store: &HashtreeStore,
708 base: &std::path::Path,
709 name: &str,
710 body: &str,
711 ) -> Cid {
712 let dir = base.join(name);
713 fs::create_dir_all(&dir).expect("create repo dir");
714 fs::write(dir.join("README.md"), body).expect("write repo file");
715 let cid = store
716 .upload_dir_with_options(&dir, true)
717 .expect("upload repo directory");
718 let cid = Cid::parse(&cid).expect("parse repo cid");
719 store.unpin(&cid.hash).expect("clear upload auto-pin");
720 cid
721 }
722
723 #[test]
724 fn classify_sync_event_ignores_removed_pinned_refs() {
725 let keys = Keys::generate();
726 let author = Keys::generate().public_key();
727 let key = format!("{}/repo", author.to_bech32().expect("author npub"));
728
729 let priority = classify_sync_event(
730 &key,
731 &author.to_hex(),
732 &keys.public_key(),
733 &HashSet::new(),
734 &HashSet::new(),
735 );
736
737 assert_eq!(priority, None);
738 }
739
740 #[test]
741 fn pinned_sync_update_replaces_old_root_pin() {
742 let temp_dir = TempDir::new().expect("temp dir");
743 let store = HashtreeStore::new(temp_dir.path().join("store")).expect("store");
744 let first_cid = upload_repo_root(&store, temp_dir.path(), "repo-v1", "version one\n");
745 let second_cid = upload_repo_root(&store, temp_dir.path(), "repo-v2", "version two\n");
746 let repo_key = format!(
747 "{}/repo",
748 Keys::generate()
749 .public_key()
750 .to_bech32()
751 .expect("repo owner npub")
752 );
753
754 let first_task = SyncTask {
755 key: repo_key.clone(),
756 cid: first_cid.clone(),
757 priority: SyncPriority::Pinned,
758 queued_at: Instant::now(),
759 };
760 apply_synced_tree_update(&store, &first_task).expect("apply first sync update");
761
762 assert!(store.is_pinned(&first_cid.hash).expect("first root pinned"));
763 assert_eq!(
764 store.get_tree_ref(&repo_key).expect("first tree ref"),
765 Some(first_cid.hash)
766 );
767
768 let second_task = SyncTask {
769 key: repo_key.clone(),
770 cid: second_cid.clone(),
771 priority: SyncPriority::Pinned,
772 queued_at: Instant::now(),
773 };
774 apply_synced_tree_update(&store, &second_task).expect("apply second sync update");
775
776 assert!(
777 !store
778 .is_pinned(&first_cid.hash)
779 .expect("first root pin status"),
780 "updating a pinned ref should unpin the superseded root"
781 );
782 assert!(store
783 .is_pinned(&second_cid.hash)
784 .expect("second root pinned"));
785 assert_eq!(
786 store.get_tree_ref(&repo_key).expect("updated tree ref"),
787 Some(second_cid.hash)
788 );
789 assert!(
790 store
791 .get_tree_meta(&first_cid.hash)
792 .expect("first meta lookup")
793 .is_none(),
794 "superseded pinned root should be unindexed after update"
795 );
796 }
797}