Skip to main content

hashtree_cli/
sync.rs

1//! Background sync service for auto-pulling trees from Nostr
2//!
3//! Subscribes to:
4//! 1. Own trees (all visibility levels) - highest priority
5//! 2. Followed users' public trees - lower priority
6//!
7//! Uses WebRTC peers first, falls back to Blossom HTTP servers
8
9use anyhow::Result;
10use hashtree_core::{from_hex, to_hex, Cid};
11use nostr_sdk::prelude::*;
12use std::collections::{HashMap, HashSet, VecDeque};
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::RwLock;
17use tracing::{error, info, warn};
18
19use crate::fetch::{FetchConfig, Fetcher};
20use crate::storage::{HashtreeStore, PRIORITY_FOLLOWED, PRIORITY_OWN};
21use crate::webrtc::WebRTCState;
22
23/// Sync priority levels
24#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
25pub enum SyncPriority {
26    /// Explicitly pinned mutable refs - highest priority
27    Pinned = 0,
28    /// Own trees - high priority
29    Own = 1,
30    /// Followed users' trees - lower priority
31    Followed = 2,
32}
33
34/// A tree to sync
35#[derive(Debug, Clone)]
36pub struct SyncTask {
37    /// Nostr key (npub.../treename)
38    pub key: String,
39    /// Content identifier
40    pub cid: Cid,
41    /// Priority level
42    pub priority: SyncPriority,
43    /// When this task was queued
44    pub queued_at: Instant,
45}
46
47/// Configuration for background sync
48#[derive(Debug, Clone)]
49pub struct SyncConfig {
50    /// Enable syncing own trees
51    pub sync_own: bool,
52    /// Enable syncing followed users' public trees
53    pub sync_followed: bool,
54    /// Nostr relays for subscriptions
55    pub relays: Vec<String>,
56    /// Max concurrent sync tasks
57    pub max_concurrent: usize,
58    /// Timeout for WebRTC requests (ms)
59    pub webrtc_timeout_ms: u64,
60    /// Timeout for Blossom requests (ms)
61    pub blossom_timeout_ms: u64,
62}
63
64impl Default for SyncConfig {
65    fn default() -> Self {
66        Self {
67            sync_own: true,
68            sync_followed: true,
69            relays: hashtree_config::DEFAULT_RELAYS
70                .iter()
71                .map(|s| s.to_string())
72                .collect(),
73            max_concurrent: 3,
74            webrtc_timeout_ms: 2000,
75            blossom_timeout_ms: 10000,
76        }
77    }
78}
79
80impl SyncConfig {
81    /// Create from hashtree_config (respects user's config.toml)
82    pub fn from_config(config: &hashtree_config::Config) -> Self {
83        Self {
84            sync_own: true,
85            sync_followed: true,
86            relays: config.nostr.relays.clone(),
87            max_concurrent: 3,
88            webrtc_timeout_ms: 2000,
89            blossom_timeout_ms: 10000,
90        }
91    }
92}
93
94/// State for a subscribed tree
95#[allow(dead_code)]
96struct TreeSubscription {
97    key: String,
98    current_cid: Option<Cid>,
99    priority: SyncPriority,
100    last_synced: Option<Instant>,
101}
102
103fn build_exact_tree_filter(key: &str) -> Result<Filter> {
104    let (npub, tree_name) = key
105        .split_once('/')
106        .ok_or_else(|| anyhow::anyhow!("Invalid pinned ref key: {}", key))?;
107    let author = PublicKey::from_bech32(npub)
108        .map_err(|_| anyhow::anyhow!("Invalid npub in pinned ref key: {}", key))?;
109
110    Ok(Filter::new()
111        .kind(Kind::Custom(30078))
112        .author(author)
113        .custom_tag(
114            SingleLetterTag::lowercase(Alphabet::D),
115            vec![tree_name.to_string()],
116        )
117        .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]))
118}
119
120fn classify_sync_event(
121    key: &str,
122    author_hex: &str,
123    my_pubkey: &PublicKey,
124    pinned_refs: &HashSet<String>,
125    followed_authors: &HashSet<String>,
126) -> Option<SyncPriority> {
127    if pinned_refs.contains(key) {
128        return Some(SyncPriority::Pinned);
129    }
130
131    if author_hex == my_pubkey.to_hex() {
132        return Some(SyncPriority::Own);
133    }
134
135    if followed_authors.contains(author_hex) {
136        return Some(SyncPriority::Followed);
137    }
138
139    None
140}
141
142fn apply_synced_tree_update(store: &HashtreeStore, task: &SyncTask) -> Result<()> {
143    let (owner, name) = task
144        .key
145        .split_once('/')
146        .map(|(o, n)| (o.to_string(), Some(n)))
147        .unwrap_or((task.key.clone(), None));
148
149    let storage_priority = match task.priority {
150        SyncPriority::Pinned | SyncPriority::Own => PRIORITY_OWN,
151        SyncPriority::Followed => PRIORITY_FOLLOWED,
152    };
153
154    if task.priority == SyncPriority::Pinned {
155        store.pin(&task.cid.hash)?;
156    }
157
158    store.index_tree(
159        &task.cid.hash,
160        &owner,
161        name,
162        storage_priority,
163        Some(&task.key),
164    )?;
165
166    store.evict_if_needed()?;
167    Ok(())
168}
169
170/// Background sync service
171pub struct BackgroundSync {
172    config: SyncConfig,
173    store: Arc<HashtreeStore>,
174    webrtc_state: Option<Arc<WebRTCState>>,
175    /// Nostr client for subscriptions
176    client: Client,
177    /// Our public key
178    my_pubkey: PublicKey,
179    /// Subscribed trees
180    subscriptions: Arc<RwLock<HashMap<String, TreeSubscription>>>,
181    /// Followed authors that are allowed to generate sync tasks
182    followed_authors: Arc<RwLock<HashSet<String>>>,
183    /// Currently pinned mutable refs that should keep following updates
184    pinned_refs: Arc<RwLock<HashSet<String>>>,
185    /// Exact pinned refs already subscribed at the relay layer
186    subscribed_pinned_refs: Arc<RwLock<HashSet<String>>>,
187    /// Sync queue
188    queue: Arc<RwLock<VecDeque<SyncTask>>>,
189    /// Currently syncing hashes
190    syncing: Arc<RwLock<HashSet<String>>>,
191    /// Shutdown signal
192    shutdown_tx: tokio::sync::watch::Sender<bool>,
193    shutdown_rx: tokio::sync::watch::Receiver<bool>,
194    /// Fetcher for remote content
195    fetcher: Arc<Fetcher>,
196}
197
198impl BackgroundSync {
199    /// Create a new background sync service
200    pub async fn new(
201        config: SyncConfig,
202        store: Arc<HashtreeStore>,
203        keys: Keys,
204        webrtc_state: Option<Arc<WebRTCState>>,
205    ) -> Result<Self> {
206        let my_pubkey = keys.public_key();
207        let client = Client::new(keys);
208
209        // Add relays
210        for relay in &config.relays {
211            if let Err(e) = client.add_relay(relay).await {
212                warn!("Failed to add relay {}: {}", relay, e);
213            }
214        }
215
216        // Connect to relays
217        client.connect().await;
218
219        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
220
221        // Create fetcher with config
222        // BlossomClient auto-loads servers from ~/.hashtree/config.toml
223        let fetch_config = FetchConfig {
224            webrtc_timeout: Duration::from_millis(config.webrtc_timeout_ms),
225            blossom_timeout: Duration::from_millis(config.blossom_timeout_ms),
226        };
227        let fetcher = Arc::new(Fetcher::new(fetch_config));
228
229        Ok(Self {
230            config,
231            store,
232            webrtc_state,
233            client,
234            my_pubkey,
235            subscriptions: Arc::new(RwLock::new(HashMap::new())),
236            followed_authors: Arc::new(RwLock::new(HashSet::new())),
237            pinned_refs: Arc::new(RwLock::new(HashSet::new())),
238            subscribed_pinned_refs: Arc::new(RwLock::new(HashSet::new())),
239            queue: Arc::new(RwLock::new(VecDeque::new())),
240            syncing: Arc::new(RwLock::new(HashSet::new())),
241            shutdown_tx,
242            shutdown_rx,
243            fetcher,
244        })
245    }
246
247    /// Start the background sync service
248    pub async fn run(&self, contacts_file: PathBuf) -> Result<()> {
249        info!("Starting background sync service");
250
251        // Wait for relays to connect before subscribing
252        tokio::time::sleep(Duration::from_secs(3)).await;
253
254        self.refresh_pinned_ref_subscriptions().await?;
255
256        // Subscribe to own trees
257        if self.config.sync_own {
258            self.subscribe_own_trees().await?;
259        }
260
261        // Subscribe to followed users' trees
262        if self.config.sync_followed {
263            self.subscribe_followed_trees(&contacts_file).await?;
264        }
265
266        // Start sync worker
267        let queue = self.queue.clone();
268        let syncing = self.syncing.clone();
269        let store = self.store.clone();
270        let webrtc_state = self.webrtc_state.clone();
271        let fetcher = self.fetcher.clone();
272        let max_concurrent = self.config.max_concurrent;
273        let mut shutdown_rx = self.shutdown_rx.clone();
274
275        // Spawn sync worker task
276        tokio::spawn(async move {
277            let mut interval = tokio::time::interval(Duration::from_millis(500));
278
279            loop {
280                tokio::select! {
281                    _ = shutdown_rx.changed() => {
282                        if *shutdown_rx.borrow() {
283                            info!("Sync worker shutting down");
284                            break;
285                        }
286                    }
287                    _ = interval.tick() => {
288                        // Check if we can start more sync tasks
289                        let current_syncing = syncing.read().await.len();
290                        if current_syncing >= max_concurrent {
291                            continue;
292                        }
293
294                        // Get next task from queue
295                        let task = {
296                            let mut q = queue.write().await;
297                            q.pop_front()
298                        };
299
300                        if let Some(task) = task {
301                            let hash_hex = to_hex(&task.cid.hash);
302
303                            // Check if already syncing
304                            {
305                                let mut s = syncing.write().await;
306                                if s.contains(&hash_hex) {
307                                    continue;
308                                }
309                                s.insert(hash_hex.clone());
310                            }
311
312                            // Spawn sync task
313                            let syncing_clone = syncing.clone();
314                            let store_clone = store.clone();
315                            let webrtc_clone = webrtc_state.clone();
316                            let fetcher_clone = fetcher.clone();
317
318                            tokio::spawn(async move {
319                                let result = fetcher_clone.fetch_tree(
320                                    &store_clone,
321                                    webrtc_clone.as_ref(),
322                                    &task.cid.hash,
323                                ).await;
324
325                                match result {
326                                    Ok((chunks_fetched, bytes_fetched)) => {
327                                        if chunks_fetched > 0 {
328                                            info!(
329                                                "Synced tree {} ({} chunks, {} bytes)",
330                                                &hash_hex[..12],
331                                                chunks_fetched,
332                                                bytes_fetched
333                                            );
334                                        } else {
335                                            tracing::debug!(
336                                                "Tree {} already present locally; applying ref update",
337                                                &hash_hex[..12]
338                                            );
339                                        }
340
341                                        if let Err(e) = apply_synced_tree_update(&store_clone, &task) {
342                                            warn!("Failed to apply synced tree {}: {}", &hash_hex[..12], e);
343                                        }
344                                    }
345                                    Err(e) => {
346                                        warn!("Failed to sync tree {}: {}", &hash_hex[..12], e);
347                                    }
348                                }
349
350                                // Remove from syncing set
351                                syncing_clone.write().await.remove(&hash_hex);
352                            });
353                        }
354                    }
355                }
356            }
357        });
358
359        // Handle Nostr notifications for tree updates
360        let mut notifications = self.client.notifications();
361        let subscriptions = self.subscriptions.clone();
362        let queue = self.queue.clone();
363        let mut pinned_refresh = tokio::time::interval(Duration::from_secs(5));
364        let mut shutdown_rx = self.shutdown_rx.clone();
365
366        loop {
367            tokio::select! {
368                _ = shutdown_rx.changed() => {
369                    if *shutdown_rx.borrow() {
370                        info!("Background sync shutting down");
371                        break;
372                    }
373                }
374                _ = pinned_refresh.tick() => {
375                    if let Err(err) = self.refresh_pinned_ref_subscriptions().await {
376                        warn!("Failed to refresh pinned ref subscriptions: {}", err);
377                    }
378                }
379                notification = notifications.recv() => {
380                    match notification {
381                        Ok(RelayPoolNotification::Event { event, .. }) => {
382                            self.handle_tree_event(&event, &subscriptions, &queue).await;
383                        }
384                        Ok(_) => {}
385                        Err(e) => {
386                            error!("Notification error: {}", e);
387                            break;
388                        }
389                    }
390                }
391            }
392        }
393
394        Ok(())
395    }
396
397    async fn refresh_pinned_ref_subscriptions(&self) -> Result<()> {
398        let current_refs: HashSet<String> = self.store.list_pinned_refs()?.into_iter().collect();
399        {
400            let mut pinned_refs = self.pinned_refs.write().await;
401            *pinned_refs = current_refs.clone();
402        }
403
404        {
405            let mut subscriptions = self.subscriptions.write().await;
406            subscriptions.retain(|key, sub| {
407                sub.priority != SyncPriority::Pinned || current_refs.contains(key)
408            });
409        }
410
411        let new_refs: Vec<String> = {
412            let subscribed = self.subscribed_pinned_refs.read().await;
413            current_refs
414                .iter()
415                .filter(|key| !subscribed.contains(*key))
416                .cloned()
417                .collect()
418        };
419
420        for key in new_refs {
421            let filter = match build_exact_tree_filter(&key) {
422                Ok(filter) => filter,
423                Err(err) => {
424                    warn!("Ignoring invalid pinned ref {}: {}", key, err);
425                    continue;
426                }
427            };
428
429            match self.client.subscribe(vec![filter], None).await {
430                Ok(_) => {
431                    info!("Subscribed to pinned ref {}", key);
432                    self.subscribed_pinned_refs.write().await.insert(key);
433                }
434                Err(err) => {
435                    warn!(
436                        "Failed to subscribe to pinned ref (will retry on refresh): {}",
437                        err
438                    );
439                }
440            }
441        }
442
443        Ok(())
444    }
445
446    /// Subscribe to own trees (kind 30078 events from our pubkey)
447    async fn subscribe_own_trees(&self) -> Result<()> {
448        let filter = Filter::new()
449            .kind(Kind::Custom(30078))
450            .author(self.my_pubkey)
451            .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
452
453        match self.client.subscribe(vec![filter], None).await {
454            Ok(_) => {
455                info!(
456                    "Subscribed to own trees for {}",
457                    self.my_pubkey.to_bech32().unwrap_or_default()
458                );
459            }
460            Err(e) => {
461                warn!(
462                    "Failed to subscribe to own trees (will retry on reconnect): {}",
463                    e
464                );
465            }
466        }
467
468        Ok(())
469    }
470
471    /// Subscribe to followed users' trees
472    async fn subscribe_followed_trees(&self, contacts_file: &PathBuf) -> Result<()> {
473        // Load contacts from file
474        let contacts: Vec<String> = if contacts_file.exists() {
475            let data = std::fs::read_to_string(contacts_file)?;
476            serde_json::from_str(&data).unwrap_or_default()
477        } else {
478            Vec::new()
479        };
480
481        if contacts.is_empty() {
482            self.followed_authors.write().await.clear();
483            info!("No contacts to subscribe to");
484            return Ok(());
485        }
486
487        {
488            let mut followed_authors = self.followed_authors.write().await;
489            *followed_authors = contacts.iter().cloned().collect();
490        }
491
492        // Convert hex pubkeys to PublicKey
493        let pubkeys: Vec<PublicKey> = contacts
494            .iter()
495            .filter_map(|hex| PublicKey::from_hex(hex).ok())
496            .collect();
497
498        if pubkeys.is_empty() {
499            return Ok(());
500        }
501
502        // Subscribe to all followed users' hashtree events
503        let filter = Filter::new()
504            .kind(Kind::Custom(30078))
505            .authors(pubkeys.clone())
506            .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
507
508        match self.client.subscribe(vec![filter], None).await {
509            Ok(_) => {
510                info!("Subscribed to {} followed users' trees", pubkeys.len());
511            }
512            Err(e) => {
513                warn!(
514                    "Failed to subscribe to followed trees (will retry on reconnect): {}",
515                    e
516                );
517            }
518        }
519
520        Ok(())
521    }
522
523    /// Handle incoming tree event
524    async fn handle_tree_event(
525        &self,
526        event: &Event,
527        subscriptions: &Arc<RwLock<HashMap<String, TreeSubscription>>>,
528        queue: &Arc<RwLock<VecDeque<SyncTask>>>,
529    ) {
530        // Check if it's a hashtree event
531        let has_hashtree_tag = event.tags.iter().any(|tag| {
532            let v = tag.as_slice();
533            v.len() >= 2 && v[0] == "l" && v[1] == "hashtree"
534        });
535
536        if !has_hashtree_tag || event.kind != Kind::Custom(30078) {
537            return;
538        }
539
540        // Extract d-tag (tree name)
541        let d_tag = event.tags.iter().find_map(|tag| {
542            if let Some(TagStandard::Identifier(id)) = tag.as_standardized() {
543                Some(id.clone())
544            } else {
545                None
546            }
547        });
548
549        let tree_name = match d_tag {
550            Some(name) => name,
551            None => return,
552        };
553
554        // Extract hash and key from tags
555        let mut hash_hex: Option<String> = None;
556        let mut key_hex: Option<String> = None;
557
558        for tag in event.tags.iter() {
559            let tag_vec = tag.as_slice();
560            if tag_vec.len() >= 2 {
561                match tag_vec[0].as_str() {
562                    "hash" => hash_hex = Some(tag_vec[1].clone()),
563                    "key" => key_hex = Some(tag_vec[1].clone()),
564                    _ => {}
565                }
566            }
567        }
568
569        let hash = match hash_hex.and_then(|h| from_hex(&h).ok()) {
570            Some(h) => h,
571            None => return,
572        };
573
574        let key = key_hex.and_then(|k| {
575            let bytes = hex::decode(&k).ok()?;
576            if bytes.len() == 32 {
577                let mut arr = [0u8; 32];
578                arr.copy_from_slice(&bytes);
579                Some(arr)
580            } else {
581                None
582            }
583        });
584
585        let cid = Cid { hash, key };
586
587        // Build key
588        let npub = event
589            .pubkey
590            .to_bech32()
591            .unwrap_or_else(|_| event.pubkey.to_hex());
592        let key = format!("{}/{}", npub, tree_name);
593
594        let author_hex = event.pubkey.to_hex();
595        let pinned_refs = self.pinned_refs.read().await.clone();
596        let followed_authors = self.followed_authors.read().await.clone();
597
598        // Determine priority and ignore stale events from refs we no longer care about.
599        let Some(priority) = classify_sync_event(
600            &key,
601            &author_hex,
602            &self.my_pubkey,
603            &pinned_refs,
604            &followed_authors,
605        ) else {
606            return;
607        };
608
609        // Check if we need to sync
610        let should_sync = {
611            let mut subs = subscriptions.write().await;
612            let sub = subs.entry(key.clone()).or_insert(TreeSubscription {
613                key: key.clone(),
614                current_cid: None,
615                priority,
616                last_synced: None,
617            });
618
619            // Check if CID changed
620            let changed = sub.current_cid.as_ref().map(|c| c.hash) != Some(cid.hash);
621            if changed {
622                sub.current_cid = Some(cid.clone());
623                true
624            } else {
625                false
626            }
627        };
628
629        if should_sync {
630            info!(
631                "New tree update: {} -> {}",
632                key,
633                to_hex(&cid.hash)[..12].to_string()
634            );
635
636            // Add to sync queue
637            let task = SyncTask {
638                key,
639                cid,
640                priority,
641                queued_at: Instant::now(),
642            };
643
644            let mut q = queue.write().await;
645
646            // Insert based on priority (own trees first)
647            let insert_pos = q
648                .iter()
649                .position(|t| t.priority > task.priority)
650                .unwrap_or(q.len());
651            q.insert(insert_pos, task);
652        }
653    }
654
655    /// Signal shutdown
656    pub fn shutdown(&self) {
657        let _ = self.shutdown_tx.send(true);
658    }
659
660    /// Queue a manual sync for a specific tree
661    pub async fn queue_sync(&self, key: &str, cid: Cid, priority: SyncPriority) {
662        let task = SyncTask {
663            key: key.to_string(),
664            cid,
665            priority,
666            queued_at: Instant::now(),
667        };
668
669        let mut q = self.queue.write().await;
670        let insert_pos = q
671            .iter()
672            .position(|t| t.priority > task.priority)
673            .unwrap_or(q.len());
674        q.insert(insert_pos, task);
675    }
676
677    /// Get current sync status
678    pub async fn status(&self) -> SyncStatus {
679        let subscriptions = self.subscriptions.read().await;
680        let queue = self.queue.read().await;
681        let syncing = self.syncing.read().await;
682
683        SyncStatus {
684            subscribed_trees: subscriptions.len(),
685            queued_tasks: queue.len(),
686            active_syncs: syncing.len(),
687        }
688    }
689}
690
691/// Overall sync status
692#[derive(Debug, Clone)]
693pub struct SyncStatus {
694    pub subscribed_trees: usize,
695    pub queued_tasks: usize,
696    pub active_syncs: usize,
697}
698
699#[cfg(test)]
700mod tests {
701    use super::*;
702    use nostr_sdk::Keys;
703    use std::fs;
704    use tempfile::TempDir;
705
706    fn upload_repo_root(
707        store: &HashtreeStore,
708        base: &std::path::Path,
709        name: &str,
710        body: &str,
711    ) -> Cid {
712        let dir = base.join(name);
713        fs::create_dir_all(&dir).expect("create repo dir");
714        fs::write(dir.join("README.md"), body).expect("write repo file");
715        let cid = store
716            .upload_dir_with_options(&dir, true)
717            .expect("upload repo directory");
718        let cid = Cid::parse(&cid).expect("parse repo cid");
719        store.unpin(&cid.hash).expect("clear upload auto-pin");
720        cid
721    }
722
723    #[test]
724    fn classify_sync_event_ignores_removed_pinned_refs() {
725        let keys = Keys::generate();
726        let author = Keys::generate().public_key();
727        let key = format!("{}/repo", author.to_bech32().expect("author npub"));
728
729        let priority = classify_sync_event(
730            &key,
731            &author.to_hex(),
732            &keys.public_key(),
733            &HashSet::new(),
734            &HashSet::new(),
735        );
736
737        assert_eq!(priority, None);
738    }
739
740    #[test]
741    fn pinned_sync_update_replaces_old_root_pin() {
742        let temp_dir = TempDir::new().expect("temp dir");
743        let store = HashtreeStore::new(temp_dir.path().join("store")).expect("store");
744        let first_cid = upload_repo_root(&store, temp_dir.path(), "repo-v1", "version one\n");
745        let second_cid = upload_repo_root(&store, temp_dir.path(), "repo-v2", "version two\n");
746        let repo_key = format!(
747            "{}/repo",
748            Keys::generate()
749                .public_key()
750                .to_bech32()
751                .expect("repo owner npub")
752        );
753
754        let first_task = SyncTask {
755            key: repo_key.clone(),
756            cid: first_cid.clone(),
757            priority: SyncPriority::Pinned,
758            queued_at: Instant::now(),
759        };
760        apply_synced_tree_update(&store, &first_task).expect("apply first sync update");
761
762        assert!(store.is_pinned(&first_cid.hash).expect("first root pinned"));
763        assert_eq!(
764            store.get_tree_ref(&repo_key).expect("first tree ref"),
765            Some(first_cid.hash)
766        );
767
768        let second_task = SyncTask {
769            key: repo_key.clone(),
770            cid: second_cid.clone(),
771            priority: SyncPriority::Pinned,
772            queued_at: Instant::now(),
773        };
774        apply_synced_tree_update(&store, &second_task).expect("apply second sync update");
775
776        assert!(
777            !store
778                .is_pinned(&first_cid.hash)
779                .expect("first root pin status"),
780            "updating a pinned ref should unpin the superseded root"
781        );
782        assert!(store
783            .is_pinned(&second_cid.hash)
784            .expect("second root pinned"));
785        assert_eq!(
786            store.get_tree_ref(&repo_key).expect("updated tree ref"),
787            Some(second_cid.hash)
788        );
789        assert!(
790            store
791                .get_tree_meta(&first_cid.hash)
792                .expect("first meta lookup")
793                .is_none(),
794            "superseded pinned root should be unindexed after update"
795        );
796    }
797}