1use anyhow::Result;
10use git_remote_htree::nostr_client::load_keys;
11use hashtree_core::{from_hex, to_hex, Cid};
12use nostr_sdk::prelude::*;
13use std::collections::{HashMap, HashSet, VecDeque};
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::RwLock;
18use tracing::{error, info, warn};
19
20use crate::fetch::{FetchConfig, Fetcher};
21use crate::storage::{HashtreeStore, PRIORITY_FOLLOWED, PRIORITY_OWN};
22use crate::webrtc::WebRTCState;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
26pub enum SyncPriority {
27 Pinned = 0,
29 TrackedAuthor = 1,
31 Own = 2,
33 Followed = 3,
35}
36
37#[derive(Debug, Clone)]
39pub struct SyncTask {
40 pub key: String,
42 pub cid: Cid,
44 pub priority: SyncPriority,
46 pub queued_at: Instant,
48}
49
50#[derive(Debug, Clone)]
52pub struct SyncConfig {
53 pub sync_own: bool,
55 pub sync_followed: bool,
57 pub relays: Vec<String>,
59 pub max_concurrent: usize,
61 pub webrtc_timeout_ms: u64,
63 pub blossom_timeout_ms: u64,
65}
66
67impl Default for SyncConfig {
68 fn default() -> Self {
69 Self {
70 sync_own: true,
71 sync_followed: true,
72 relays: hashtree_config::DEFAULT_RELAYS
73 .iter()
74 .map(|s| s.to_string())
75 .collect(),
76 max_concurrent: 3,
77 webrtc_timeout_ms: 2000,
78 blossom_timeout_ms: 10000,
79 }
80 }
81}
82
83impl SyncConfig {
84 pub fn from_config(config: &hashtree_config::Config) -> Self {
86 Self {
87 sync_own: true,
88 sync_followed: true,
89 relays: config.nostr.relays.clone(),
90 max_concurrent: 3,
91 webrtc_timeout_ms: 2000,
92 blossom_timeout_ms: 10000,
93 }
94 }
95}
96
97#[allow(dead_code)]
99struct TreeSubscription {
100 key: String,
101 current_cid: Option<Cid>,
102 priority: SyncPriority,
103 last_synced: Option<Instant>,
104}
105
106fn build_exact_tree_filter(key: &str) -> Result<Filter> {
107 let (npub, tree_name) = key
108 .split_once('/')
109 .ok_or_else(|| anyhow::anyhow!("Invalid pinned ref key: {}", key))?;
110 let author = PublicKey::from_bech32(npub)
111 .map_err(|_| anyhow::anyhow!("Invalid npub in pinned ref key: {}", key))?;
112
113 Ok(Filter::new()
114 .kind(Kind::Custom(30078))
115 .author(author)
116 .custom_tag(
117 SingleLetterTag::lowercase(Alphabet::D),
118 vec![tree_name.to_string()],
119 )
120 .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]))
121}
122
123fn build_author_tree_filter(author: PublicKey) -> Filter {
124 Filter::new()
125 .kind(Kind::Custom(30078))
126 .author(author)
127 .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"])
128}
129
130fn load_author_signing_keys() -> HashMap<String, Keys> {
131 load_keys()
132 .into_iter()
133 .filter_map(|stored| {
134 let secret_hex = stored.secret_hex?;
135 let secret_bytes = hex::decode(&secret_hex).ok()?;
136 let secret = SecretKey::from_slice(&secret_bytes).ok()?;
137 Some((stored.pubkey_hex, Keys::new(secret)))
138 })
139 .collect()
140}
141
142fn cid_from_tree_event(event: &Event, author_keys: Option<&Keys>) -> Option<Cid> {
143 let mut hash_hex: Option<String> = None;
144 let mut key_hex: Option<String> = None;
145 let mut encrypted_key: Option<String> = None;
146 let mut self_encrypted_key: Option<String> = None;
147
148 for tag in event.tags.iter() {
149 let tag_vec = tag.as_slice();
150 if tag_vec.len() < 2 {
151 continue;
152 }
153
154 match tag_vec[0].as_str() {
155 "hash" => hash_hex = Some(tag_vec[1].clone()),
156 "key" => key_hex = Some(tag_vec[1].clone()),
157 "encryptedKey" => encrypted_key = Some(tag_vec[1].clone()),
158 "selfEncryptedKey" => self_encrypted_key = Some(tag_vec[1].clone()),
159 _ => {}
160 }
161 }
162
163 let hash = from_hex(&hash_hex?).ok()?;
164
165 if let Some(key_hex) = key_hex {
166 let bytes = hex::decode(&key_hex).ok()?;
167 if bytes.len() != 32 {
168 return None;
169 }
170 let mut key = [0u8; 32];
171 key.copy_from_slice(&bytes);
172 return Some(Cid {
173 hash,
174 key: Some(key),
175 });
176 }
177
178 if let Some(ciphertext) = self_encrypted_key {
179 let keys = author_keys?;
180 if keys.public_key() != event.pubkey {
181 return None;
182 }
183 let key_hex = nip44::decrypt(keys.secret_key(), &event.pubkey, &ciphertext).ok()?;
184 let bytes = hex::decode(&key_hex).ok()?;
185 if bytes.len() != 32 {
186 return None;
187 }
188 let mut key = [0u8; 32];
189 key.copy_from_slice(&bytes);
190 return Some(Cid {
191 hash,
192 key: Some(key),
193 });
194 }
195
196 if encrypted_key.is_some() {
197 return None;
198 }
199
200 Some(Cid { hash, key: None })
201}
202
203fn classify_sync_event(
204 key: &str,
205 author_hex: &str,
206 my_pubkey: &PublicKey,
207 pinned_refs: &HashSet<String>,
208 tracked_authors: &HashSet<String>,
209 followed_authors: &HashSet<String>,
210) -> Option<SyncPriority> {
211 if pinned_refs.contains(key) {
212 return Some(SyncPriority::Pinned);
213 }
214
215 if tracked_authors.contains(author_hex) {
216 return Some(SyncPriority::TrackedAuthor);
217 }
218
219 if author_hex == my_pubkey.to_hex() {
220 return Some(SyncPriority::Own);
221 }
222
223 if followed_authors.contains(author_hex) {
224 return Some(SyncPriority::Followed);
225 }
226
227 None
228}
229
230fn apply_synced_tree_update(store: &HashtreeStore, task: &SyncTask) -> Result<()> {
231 let (owner, name) = task
232 .key
233 .split_once('/')
234 .map(|(o, n)| (o.to_string(), Some(n)))
235 .unwrap_or((task.key.clone(), None));
236
237 let storage_priority = match task.priority {
238 SyncPriority::Pinned | SyncPriority::TrackedAuthor | SyncPriority::Own => PRIORITY_OWN,
239 SyncPriority::Followed => PRIORITY_FOLLOWED,
240 };
241
242 if matches!(
243 task.priority,
244 SyncPriority::Pinned | SyncPriority::TrackedAuthor
245 ) {
246 store.pin(&task.cid.hash)?;
247 }
248
249 store.index_tree(
250 &task.cid.hash,
251 &owner,
252 name,
253 storage_priority,
254 Some(&task.key),
255 )?;
256
257 store.evict_if_needed()?;
258 Ok(())
259}
260
261pub struct BackgroundSync {
263 config: SyncConfig,
264 store: Arc<HashtreeStore>,
265 webrtc_state: Option<Arc<WebRTCState>>,
266 client: Client,
268 my_pubkey: PublicKey,
270 subscriptions: Arc<RwLock<HashMap<String, TreeSubscription>>>,
272 followed_authors: Arc<RwLock<HashSet<String>>>,
274 pinned_refs: Arc<RwLock<HashSet<String>>>,
276 tracked_authors: Arc<RwLock<HashSet<String>>>,
278 subscribed_pinned_refs: Arc<RwLock<HashSet<String>>>,
280 subscribed_tracked_authors: Arc<RwLock<HashSet<String>>>,
282 author_signing_keys: Arc<RwLock<HashMap<String, Keys>>>,
284 queue: Arc<RwLock<VecDeque<SyncTask>>>,
286 syncing: Arc<RwLock<HashSet<String>>>,
288 shutdown_tx: tokio::sync::watch::Sender<bool>,
290 shutdown_rx: tokio::sync::watch::Receiver<bool>,
291 fetcher: Arc<Fetcher>,
293}
294
295impl BackgroundSync {
296 pub async fn new(
298 config: SyncConfig,
299 store: Arc<HashtreeStore>,
300 keys: Keys,
301 webrtc_state: Option<Arc<WebRTCState>>,
302 ) -> Result<Self> {
303 let my_pubkey = keys.public_key();
304 let client = Client::new(keys);
305
306 for relay in &config.relays {
308 if let Err(e) = client.add_relay(relay).await {
309 warn!("Failed to add relay {}: {}", relay, e);
310 }
311 }
312
313 client.connect().await;
315
316 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
317
318 let fetch_config = FetchConfig {
321 webrtc_timeout: Duration::from_millis(config.webrtc_timeout_ms),
322 blossom_timeout: Duration::from_millis(config.blossom_timeout_ms),
323 };
324 let fetcher = Arc::new(Fetcher::new(fetch_config));
325
326 Ok(Self {
327 config,
328 store,
329 webrtc_state,
330 client,
331 my_pubkey,
332 subscriptions: Arc::new(RwLock::new(HashMap::new())),
333 followed_authors: Arc::new(RwLock::new(HashSet::new())),
334 pinned_refs: Arc::new(RwLock::new(HashSet::new())),
335 tracked_authors: Arc::new(RwLock::new(HashSet::new())),
336 subscribed_pinned_refs: Arc::new(RwLock::new(HashSet::new())),
337 subscribed_tracked_authors: Arc::new(RwLock::new(HashSet::new())),
338 author_signing_keys: Arc::new(RwLock::new(load_author_signing_keys())),
339 queue: Arc::new(RwLock::new(VecDeque::new())),
340 syncing: Arc::new(RwLock::new(HashSet::new())),
341 shutdown_tx,
342 shutdown_rx,
343 fetcher,
344 })
345 }
346
347 pub async fn run(&self, contacts_file: PathBuf) -> Result<()> {
349 info!("Starting background sync service");
350
351 tokio::time::sleep(Duration::from_secs(3)).await;
353
354 self.refresh_author_signing_keys().await;
355 self.refresh_pinned_ref_subscriptions().await?;
356 self.refresh_tracked_author_subscriptions().await?;
357
358 if self.config.sync_own {
360 self.subscribe_own_trees().await?;
361 }
362
363 if self.config.sync_followed {
365 self.subscribe_followed_trees(&contacts_file).await?;
366 }
367
368 let queue = self.queue.clone();
370 let syncing = self.syncing.clone();
371 let store = self.store.clone();
372 let webrtc_state = self.webrtc_state.clone();
373 let fetcher = self.fetcher.clone();
374 let max_concurrent = self.config.max_concurrent;
375 let mut shutdown_rx = self.shutdown_rx.clone();
376
377 tokio::spawn(async move {
379 let mut interval = tokio::time::interval(Duration::from_millis(500));
380
381 loop {
382 tokio::select! {
383 _ = shutdown_rx.changed() => {
384 if *shutdown_rx.borrow() {
385 info!("Sync worker shutting down");
386 break;
387 }
388 }
389 _ = interval.tick() => {
390 let current_syncing = syncing.read().await.len();
392 if current_syncing >= max_concurrent {
393 continue;
394 }
395
396 let task = {
398 let mut q = queue.write().await;
399 q.pop_front()
400 };
401
402 if let Some(task) = task {
403 let hash_hex = to_hex(&task.cid.hash);
404
405 {
407 let mut s = syncing.write().await;
408 if s.contains(&hash_hex) {
409 continue;
410 }
411 s.insert(hash_hex.clone());
412 }
413
414 let syncing_clone = syncing.clone();
416 let store_clone = store.clone();
417 let webrtc_clone = webrtc_state.clone();
418 let fetcher_clone = fetcher.clone();
419
420 tokio::spawn(async move {
421 let result = fetcher_clone.fetch_cid_tree(
422 &store_clone,
423 webrtc_clone.as_ref(),
424 &task.cid,
425 ).await;
426
427 match result {
428 Ok((chunks_fetched, bytes_fetched)) => {
429 if chunks_fetched > 0 {
430 info!(
431 "Synced tree {} ({} chunks, {} bytes)",
432 &hash_hex[..12],
433 chunks_fetched,
434 bytes_fetched
435 );
436 } else {
437 tracing::debug!(
438 "Tree {} already present locally; applying ref update",
439 &hash_hex[..12]
440 );
441 }
442
443 match store_clone.blob_exists(&task.cid.hash) {
444 Ok(true) => {}
445 Ok(false) => {
446 warn!(
447 "Skipping ref update for {} because root {} is still missing locally",
448 task.key,
449 &hash_hex[..12]
450 );
451 syncing_clone.write().await.remove(&hash_hex);
452 return;
453 }
454 Err(err) => {
455 warn!(
456 "Failed to verify synced root {} before indexing {}: {}",
457 &hash_hex[..12],
458 task.key,
459 err
460 );
461 syncing_clone.write().await.remove(&hash_hex);
462 return;
463 }
464 }
465
466 if let Err(e) = apply_synced_tree_update(&store_clone, &task) {
467 warn!("Failed to apply synced tree {}: {}", &hash_hex[..12], e);
468 }
469 }
470 Err(e) => {
471 warn!("Failed to sync tree {}: {}", &hash_hex[..12], e);
472 }
473 }
474
475 syncing_clone.write().await.remove(&hash_hex);
477 });
478 }
479 }
480 }
481 }
482 });
483
484 let mut notifications = self.client.notifications();
486 let subscriptions = self.subscriptions.clone();
487 let queue = self.queue.clone();
488 let mut pinned_refresh = tokio::time::interval(Duration::from_secs(5));
489 let mut shutdown_rx = self.shutdown_rx.clone();
490
491 loop {
492 tokio::select! {
493 _ = shutdown_rx.changed() => {
494 if *shutdown_rx.borrow() {
495 info!("Background sync shutting down");
496 break;
497 }
498 }
499 _ = pinned_refresh.tick() => {
500 self.refresh_author_signing_keys().await;
501 if let Err(err) = self.refresh_pinned_ref_subscriptions().await {
502 warn!("Failed to refresh pinned ref subscriptions: {}", err);
503 }
504 if let Err(err) = self.refresh_tracked_author_subscriptions().await {
505 warn!("Failed to refresh tracked author subscriptions: {}", err);
506 }
507 }
508 notification = notifications.recv() => {
509 match notification {
510 Ok(RelayPoolNotification::Event { event, .. }) => {
511 self.handle_tree_event(&event, &subscriptions, &queue).await;
512 }
513 Ok(_) => {}
514 Err(e) => {
515 error!("Notification error: {}", e);
516 break;
517 }
518 }
519 }
520 }
521 }
522
523 Ok(())
524 }
525
526 async fn refresh_pinned_ref_subscriptions(&self) -> Result<()> {
527 let current_refs: HashSet<String> = self.store.list_pinned_refs()?.into_iter().collect();
528 {
529 let mut pinned_refs = self.pinned_refs.write().await;
530 *pinned_refs = current_refs.clone();
531 }
532
533 {
534 let mut subscriptions = self.subscriptions.write().await;
535 subscriptions.retain(|key, sub| {
536 sub.priority != SyncPriority::Pinned || current_refs.contains(key)
537 });
538 }
539
540 let new_refs: Vec<String> = {
541 let subscribed = self.subscribed_pinned_refs.read().await;
542 current_refs
543 .iter()
544 .filter(|key| !subscribed.contains(*key))
545 .cloned()
546 .collect()
547 };
548
549 for key in new_refs {
550 let filter = match build_exact_tree_filter(&key) {
551 Ok(filter) => filter,
552 Err(err) => {
553 warn!("Ignoring invalid pinned ref {}: {}", key, err);
554 continue;
555 }
556 };
557
558 match self.client.subscribe(vec![filter], None).await {
559 Ok(_) => {
560 info!("Subscribed to pinned ref {}", key);
561 self.subscribed_pinned_refs.write().await.insert(key);
562 }
563 Err(err) => {
564 warn!(
565 "Failed to subscribe to pinned ref (will retry on refresh): {}",
566 err
567 );
568 }
569 }
570 }
571
572 Ok(())
573 }
574
575 async fn refresh_author_signing_keys(&self) {
576 let mut author_signing_keys = self.author_signing_keys.write().await;
577 *author_signing_keys = load_author_signing_keys();
578 }
579
580 async fn refresh_tracked_author_subscriptions(&self) -> Result<()> {
581 let tracked_npubs = self.store.list_tracked_authors()?;
582 let parsed_authors: Vec<(String, PublicKey, String)> = tracked_npubs
583 .into_iter()
584 .filter_map(|npub| match PublicKey::from_bech32(&npub) {
585 Ok(pubkey) => Some((npub, pubkey, pubkey.to_hex())),
586 Err(err) => {
587 warn!("Ignoring invalid tracked author {}: {}", npub, err);
588 None
589 }
590 })
591 .collect();
592 let current_authors: HashSet<String> = parsed_authors
593 .iter()
594 .map(|(_, _, author_hex)| author_hex.clone())
595 .collect();
596
597 {
598 let mut tracked_authors = self.tracked_authors.write().await;
599 *tracked_authors = current_authors.clone();
600 }
601
602 {
603 let mut subscriptions = self.subscriptions.write().await;
604 subscriptions.retain(|key, sub| {
605 if sub.priority != SyncPriority::TrackedAuthor {
606 return true;
607 }
608
609 let Some((npub, _)) = key.split_once('/') else {
610 return false;
611 };
612 let Ok(author) = PublicKey::from_bech32(npub) else {
613 return false;
614 };
615 current_authors.contains(&author.to_hex())
616 });
617 }
618
619 let new_authors: Vec<(String, PublicKey)> = {
620 let subscribed = self.subscribed_tracked_authors.read().await;
621 parsed_authors
622 .iter()
623 .filter(|(_, _, author_hex)| !subscribed.contains(author_hex))
624 .map(|(_, pubkey, author_hex)| (author_hex.clone(), *pubkey))
625 .collect()
626 };
627
628 for (author_hex, author) in new_authors {
629 match self
630 .client
631 .subscribe(vec![build_author_tree_filter(author)], None)
632 .await
633 {
634 Ok(_) => {
635 info!(
636 "Subscribed to tracked author {}",
637 author.to_bech32().unwrap_or(author_hex.clone())
638 );
639 self.subscribed_tracked_authors
640 .write()
641 .await
642 .insert(author_hex);
643 }
644 Err(err) => {
645 warn!(
646 "Failed to subscribe to tracked author (will retry on refresh): {}",
647 err
648 );
649 }
650 }
651 }
652
653 Ok(())
654 }
655
656 async fn subscribe_own_trees(&self) -> Result<()> {
658 let filter = build_author_tree_filter(self.my_pubkey);
659
660 match self.client.subscribe(vec![filter], None).await {
661 Ok(_) => {
662 info!(
663 "Subscribed to own trees for {}",
664 self.my_pubkey.to_bech32().unwrap_or_default()
665 );
666 }
667 Err(e) => {
668 warn!(
669 "Failed to subscribe to own trees (will retry on reconnect): {}",
670 e
671 );
672 }
673 }
674
675 Ok(())
676 }
677
678 async fn subscribe_followed_trees(&self, contacts_file: &PathBuf) -> Result<()> {
680 let contacts: Vec<String> = if contacts_file.exists() {
682 let data = std::fs::read_to_string(contacts_file)?;
683 serde_json::from_str(&data).unwrap_or_default()
684 } else {
685 Vec::new()
686 };
687
688 if contacts.is_empty() {
689 self.followed_authors.write().await.clear();
690 info!("No contacts to subscribe to");
691 return Ok(());
692 }
693
694 {
695 let mut followed_authors = self.followed_authors.write().await;
696 *followed_authors = contacts.iter().cloned().collect();
697 }
698
699 let pubkeys: Vec<PublicKey> = contacts
701 .iter()
702 .filter_map(|hex| PublicKey::from_hex(hex).ok())
703 .collect();
704
705 if pubkeys.is_empty() {
706 return Ok(());
707 }
708
709 let filter = Filter::new()
711 .kind(Kind::Custom(30078))
712 .authors(pubkeys.clone())
713 .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
714
715 match self.client.subscribe(vec![filter], None).await {
716 Ok(_) => {
717 info!("Subscribed to {} followed users' trees", pubkeys.len());
718 }
719 Err(e) => {
720 warn!(
721 "Failed to subscribe to followed trees (will retry on reconnect): {}",
722 e
723 );
724 }
725 }
726
727 Ok(())
728 }
729
730 async fn handle_tree_event(
732 &self,
733 event: &Event,
734 subscriptions: &Arc<RwLock<HashMap<String, TreeSubscription>>>,
735 queue: &Arc<RwLock<VecDeque<SyncTask>>>,
736 ) {
737 let has_hashtree_tag = event.tags.iter().any(|tag| {
739 let v = tag.as_slice();
740 v.len() >= 2 && v[0] == "l" && v[1] == "hashtree"
741 });
742
743 if !has_hashtree_tag || event.kind != Kind::Custom(30078) {
744 return;
745 }
746
747 let d_tag = event.tags.iter().find_map(|tag| {
749 if let Some(TagStandard::Identifier(id)) = tag.as_standardized() {
750 Some(id.clone())
751 } else {
752 None
753 }
754 });
755
756 let tree_name = match d_tag {
757 Some(name) => name,
758 None => return,
759 };
760
761 let npub = event
763 .pubkey
764 .to_bech32()
765 .unwrap_or_else(|_| event.pubkey.to_hex());
766 let key = format!("{}/{}", npub, tree_name);
767
768 let author_hex = event.pubkey.to_hex();
769 let pinned_refs = self.pinned_refs.read().await.clone();
770 let tracked_authors = self.tracked_authors.read().await.clone();
771 let followed_authors = self.followed_authors.read().await.clone();
772
773 let Some(priority) = classify_sync_event(
775 &key,
776 &author_hex,
777 &self.my_pubkey,
778 &pinned_refs,
779 &tracked_authors,
780 &followed_authors,
781 ) else {
782 return;
783 };
784
785 let author_keys = self
786 .author_signing_keys
787 .read()
788 .await
789 .get(&author_hex)
790 .cloned();
791 let Some(cid) = cid_from_tree_event(event, author_keys.as_ref()) else {
792 return;
793 };
794
795 let should_sync = {
797 let mut subs = subscriptions.write().await;
798 let sub = subs.entry(key.clone()).or_insert(TreeSubscription {
799 key: key.clone(),
800 current_cid: None,
801 priority,
802 last_synced: None,
803 });
804
805 let changed = sub.current_cid.as_ref().map(|c| c.hash) != Some(cid.hash);
807 if changed {
808 sub.current_cid = Some(cid.clone());
809 true
810 } else {
811 false
812 }
813 };
814
815 if should_sync {
816 info!(
817 "New tree update: {} -> {}",
818 key,
819 to_hex(&cid.hash)[..12].to_string()
820 );
821
822 let task = SyncTask {
824 key,
825 cid,
826 priority,
827 queued_at: Instant::now(),
828 };
829
830 let mut q = queue.write().await;
831
832 let insert_pos = q
834 .iter()
835 .position(|t| t.priority > task.priority)
836 .unwrap_or(q.len());
837 q.insert(insert_pos, task);
838 }
839 }
840
841 pub fn shutdown(&self) {
843 let _ = self.shutdown_tx.send(true);
844 }
845
846 pub async fn queue_sync(&self, key: &str, cid: Cid, priority: SyncPriority) {
848 let task = SyncTask {
849 key: key.to_string(),
850 cid,
851 priority,
852 queued_at: Instant::now(),
853 };
854
855 let mut q = self.queue.write().await;
856 let insert_pos = q
857 .iter()
858 .position(|t| t.priority > task.priority)
859 .unwrap_or(q.len());
860 q.insert(insert_pos, task);
861 }
862
863 pub async fn status(&self) -> SyncStatus {
865 let subscriptions = self.subscriptions.read().await;
866 let queue = self.queue.read().await;
867 let syncing = self.syncing.read().await;
868
869 SyncStatus {
870 subscribed_trees: subscriptions.len(),
871 queued_tasks: queue.len(),
872 active_syncs: syncing.len(),
873 }
874 }
875}
876
877#[derive(Debug, Clone)]
879pub struct SyncStatus {
880 pub subscribed_trees: usize,
881 pub queued_tasks: usize,
882 pub active_syncs: usize,
883}
884
885#[cfg(test)]
886mod tests {
887 use super::*;
888 use nostr_sdk::Keys;
889 use std::fs;
890 use tempfile::TempDir;
891
892 fn upload_repo_root(
893 store: &HashtreeStore,
894 base: &std::path::Path,
895 name: &str,
896 body: &str,
897 ) -> Cid {
898 let dir = base.join(name);
899 fs::create_dir_all(&dir).expect("create repo dir");
900 fs::write(dir.join("README.md"), body).expect("write repo file");
901 let cid = store
902 .upload_dir_with_options(&dir, true)
903 .expect("upload repo directory");
904 let cid = Cid::parse(&cid).expect("parse repo cid");
905 store.unpin(&cid.hash).expect("clear upload auto-pin");
906 cid
907 }
908
909 #[test]
910 fn classify_sync_event_ignores_removed_pinned_refs() {
911 let keys = Keys::generate();
912 let author = Keys::generate().public_key();
913 let key = format!("{}/repo", author.to_bech32().expect("author npub"));
914
915 let priority = classify_sync_event(
916 &key,
917 &author.to_hex(),
918 &keys.public_key(),
919 &HashSet::new(),
920 &HashSet::new(),
921 &HashSet::new(),
922 );
923
924 assert_eq!(priority, None);
925 }
926
927 #[test]
928 fn classify_sync_event_prioritizes_tracked_authors() {
929 let keys = Keys::generate();
930 let author = Keys::generate().public_key();
931 let key = format!("{}/repo", author.to_bech32().expect("author npub"));
932
933 let mut tracked_authors = HashSet::new();
934 tracked_authors.insert(author.to_hex());
935
936 let priority = classify_sync_event(
937 &key,
938 &author.to_hex(),
939 &keys.public_key(),
940 &HashSet::new(),
941 &tracked_authors,
942 &HashSet::new(),
943 );
944
945 assert_eq!(priority, Some(SyncPriority::TrackedAuthor));
946 }
947
948 #[test]
949 fn tracked_author_private_event_uses_matching_local_key() {
950 let author = Keys::generate();
951 let root_hash = [0x11; 32];
952 let root_key = [0x22; 32];
953 let ciphertext = nip44::encrypt(
954 author.secret_key(),
955 &author.public_key(),
956 hex::encode(root_key),
957 nip44::Version::V2,
958 )
959 .expect("encrypt private root key");
960 let event = EventBuilder::new(
961 Kind::Custom(30078),
962 "",
963 vec![
964 Tag::identifier("backup".to_string()),
965 Tag::custom(
966 TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
967 vec!["hashtree"],
968 ),
969 Tag::custom(TagKind::Custom("hash".into()), vec![hex::encode(root_hash)]),
970 Tag::custom(TagKind::Custom("selfEncryptedKey".into()), vec![ciphertext]),
971 ],
972 )
973 .to_event(&author)
974 .expect("sign private root event");
975
976 let cid = cid_from_tree_event(&event, Some(&author)).expect("decrypt tracked private cid");
977
978 assert_eq!(cid.hash, root_hash);
979 assert_eq!(cid.key, Some(root_key));
980 }
981
982 #[test]
983 fn pinned_sync_update_replaces_old_root_pin() {
984 let temp_dir = TempDir::new().expect("temp dir");
985 let store = HashtreeStore::new(temp_dir.path().join("store")).expect("store");
986 let first_cid = upload_repo_root(&store, temp_dir.path(), "repo-v1", "version one\n");
987 let second_cid = upload_repo_root(&store, temp_dir.path(), "repo-v2", "version two\n");
988 let repo_key = format!(
989 "{}/repo",
990 Keys::generate()
991 .public_key()
992 .to_bech32()
993 .expect("repo owner npub")
994 );
995
996 let first_task = SyncTask {
997 key: repo_key.clone(),
998 cid: first_cid.clone(),
999 priority: SyncPriority::Pinned,
1000 queued_at: Instant::now(),
1001 };
1002 apply_synced_tree_update(&store, &first_task).expect("apply first sync update");
1003
1004 assert!(store.is_pinned(&first_cid.hash).expect("first root pinned"));
1005 assert_eq!(
1006 store.get_tree_ref(&repo_key).expect("first tree ref"),
1007 Some(first_cid.hash)
1008 );
1009
1010 let second_task = SyncTask {
1011 key: repo_key.clone(),
1012 cid: second_cid.clone(),
1013 priority: SyncPriority::Pinned,
1014 queued_at: Instant::now(),
1015 };
1016 apply_synced_tree_update(&store, &second_task).expect("apply second sync update");
1017
1018 assert!(
1019 !store
1020 .is_pinned(&first_cid.hash)
1021 .expect("first root pin status"),
1022 "updating a pinned ref should unpin the superseded root"
1023 );
1024 assert!(store
1025 .is_pinned(&second_cid.hash)
1026 .expect("second root pinned"));
1027 assert_eq!(
1028 store.get_tree_ref(&repo_key).expect("updated tree ref"),
1029 Some(second_cid.hash)
1030 );
1031 assert!(
1032 store
1033 .get_tree_meta(&first_cid.hash)
1034 .expect("first meta lookup")
1035 .is_none(),
1036 "superseded pinned root should be unindexed after update"
1037 );
1038 }
1039
1040 #[test]
1041 fn tracked_author_sync_update_replaces_old_root_pin() {
1042 let temp_dir = TempDir::new().expect("temp dir");
1043 let store = HashtreeStore::new(temp_dir.path().join("store")).expect("store");
1044 let first_cid = upload_repo_root(&store, temp_dir.path(), "repo-v1", "version one\n");
1045 let second_cid = upload_repo_root(&store, temp_dir.path(), "repo-v2", "version two\n");
1046 let repo_key = format!(
1047 "{}/repo",
1048 Keys::generate()
1049 .public_key()
1050 .to_bech32()
1051 .expect("repo owner npub")
1052 );
1053
1054 let first_task = SyncTask {
1055 key: repo_key.clone(),
1056 cid: first_cid.clone(),
1057 priority: SyncPriority::TrackedAuthor,
1058 queued_at: Instant::now(),
1059 };
1060 apply_synced_tree_update(&store, &first_task).expect("apply first tracked sync update");
1061
1062 assert!(store.is_pinned(&first_cid.hash).expect("first root pinned"));
1063 assert_eq!(
1064 store.get_tree_ref(&repo_key).expect("first tree ref"),
1065 Some(first_cid.hash)
1066 );
1067
1068 let second_task = SyncTask {
1069 key: repo_key.clone(),
1070 cid: second_cid.clone(),
1071 priority: SyncPriority::TrackedAuthor,
1072 queued_at: Instant::now(),
1073 };
1074 apply_synced_tree_update(&store, &second_task).expect("apply second tracked sync update");
1075
1076 assert!(
1077 !store
1078 .is_pinned(&first_cid.hash)
1079 .expect("first root pin status"),
1080 "updating a tracked author ref should unpin the superseded root"
1081 );
1082 assert!(store
1083 .is_pinned(&second_cid.hash)
1084 .expect("second root pinned"));
1085 assert_eq!(
1086 store.get_tree_ref(&repo_key).expect("updated tree ref"),
1087 Some(second_cid.hash)
1088 );
1089 }
1090}