use anyhow::Result;
use hashtree_core::{from_hex, to_hex, Cid};
use nostr_sdk::prelude::*;
use std::collections::{HashMap, HashSet, VecDeque};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::{error, info, warn};
use crate::fetch::{FetchConfig, Fetcher};
use crate::storage::{HashtreeStore, PRIORITY_FOLLOWED, PRIORITY_OWN};
use crate::webrtc::WebRTCState;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum SyncPriority {
Pinned = 0,
Own = 1,
Followed = 2,
}
#[derive(Debug, Clone)]
pub struct SyncTask {
pub key: String,
pub cid: Cid,
pub priority: SyncPriority,
pub queued_at: Instant,
}
#[derive(Debug, Clone)]
pub struct SyncConfig {
pub sync_own: bool,
pub sync_followed: bool,
pub relays: Vec<String>,
pub max_concurrent: usize,
pub webrtc_timeout_ms: u64,
pub blossom_timeout_ms: u64,
}
impl Default for SyncConfig {
fn default() -> Self {
Self {
sync_own: true,
sync_followed: true,
relays: hashtree_config::DEFAULT_RELAYS
.iter()
.map(|s| s.to_string())
.collect(),
max_concurrent: 3,
webrtc_timeout_ms: 2000,
blossom_timeout_ms: 10000,
}
}
}
impl SyncConfig {
pub fn from_config(config: &hashtree_config::Config) -> Self {
Self {
sync_own: true,
sync_followed: true,
relays: config.nostr.relays.clone(),
max_concurrent: 3,
webrtc_timeout_ms: 2000,
blossom_timeout_ms: 10000,
}
}
}
#[allow(dead_code)]
struct TreeSubscription {
key: String,
current_cid: Option<Cid>,
priority: SyncPriority,
last_synced: Option<Instant>,
}
fn build_exact_tree_filter(key: &str) -> Result<Filter> {
let (npub, tree_name) = key
.split_once('/')
.ok_or_else(|| anyhow::anyhow!("Invalid pinned ref key: {}", key))?;
let author = PublicKey::from_bech32(npub)
.map_err(|_| anyhow::anyhow!("Invalid npub in pinned ref key: {}", key))?;
Ok(Filter::new()
.kind(Kind::Custom(30078))
.author(author)
.custom_tag(
SingleLetterTag::lowercase(Alphabet::D),
vec![tree_name.to_string()],
)
.custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]))
}
fn classify_sync_event(
key: &str,
author_hex: &str,
my_pubkey: &PublicKey,
pinned_refs: &HashSet<String>,
followed_authors: &HashSet<String>,
) -> Option<SyncPriority> {
if pinned_refs.contains(key) {
return Some(SyncPriority::Pinned);
}
if author_hex == my_pubkey.to_hex() {
return Some(SyncPriority::Own);
}
if followed_authors.contains(author_hex) {
return Some(SyncPriority::Followed);
}
None
}
fn apply_synced_tree_update(store: &HashtreeStore, task: &SyncTask) -> Result<()> {
let (owner, name) = task
.key
.split_once('/')
.map(|(o, n)| (o.to_string(), Some(n)))
.unwrap_or((task.key.clone(), None));
let storage_priority = match task.priority {
SyncPriority::Pinned | SyncPriority::Own => PRIORITY_OWN,
SyncPriority::Followed => PRIORITY_FOLLOWED,
};
if task.priority == SyncPriority::Pinned {
store.pin(&task.cid.hash)?;
}
store.index_tree(
&task.cid.hash,
&owner,
name,
storage_priority,
Some(&task.key),
)?;
store.evict_if_needed()?;
Ok(())
}
pub struct BackgroundSync {
config: SyncConfig,
store: Arc<HashtreeStore>,
webrtc_state: Option<Arc<WebRTCState>>,
client: Client,
my_pubkey: PublicKey,
subscriptions: Arc<RwLock<HashMap<String, TreeSubscription>>>,
followed_authors: Arc<RwLock<HashSet<String>>>,
pinned_refs: Arc<RwLock<HashSet<String>>>,
subscribed_pinned_refs: Arc<RwLock<HashSet<String>>>,
queue: Arc<RwLock<VecDeque<SyncTask>>>,
syncing: Arc<RwLock<HashSet<String>>>,
shutdown_tx: tokio::sync::watch::Sender<bool>,
shutdown_rx: tokio::sync::watch::Receiver<bool>,
fetcher: Arc<Fetcher>,
}
impl BackgroundSync {
pub async fn new(
config: SyncConfig,
store: Arc<HashtreeStore>,
keys: Keys,
webrtc_state: Option<Arc<WebRTCState>>,
) -> Result<Self> {
let my_pubkey = keys.public_key();
let client = Client::new(keys);
for relay in &config.relays {
if let Err(e) = client.add_relay(relay).await {
warn!("Failed to add relay {}: {}", relay, e);
}
}
client.connect().await;
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
let fetch_config = FetchConfig {
webrtc_timeout: Duration::from_millis(config.webrtc_timeout_ms),
blossom_timeout: Duration::from_millis(config.blossom_timeout_ms),
};
let fetcher = Arc::new(Fetcher::new(fetch_config));
Ok(Self {
config,
store,
webrtc_state,
client,
my_pubkey,
subscriptions: Arc::new(RwLock::new(HashMap::new())),
followed_authors: Arc::new(RwLock::new(HashSet::new())),
pinned_refs: Arc::new(RwLock::new(HashSet::new())),
subscribed_pinned_refs: Arc::new(RwLock::new(HashSet::new())),
queue: Arc::new(RwLock::new(VecDeque::new())),
syncing: Arc::new(RwLock::new(HashSet::new())),
shutdown_tx,
shutdown_rx,
fetcher,
})
}
pub async fn run(&self, contacts_file: PathBuf) -> Result<()> {
info!("Starting background sync service");
tokio::time::sleep(Duration::from_secs(3)).await;
self.refresh_pinned_ref_subscriptions().await?;
if self.config.sync_own {
self.subscribe_own_trees().await?;
}
if self.config.sync_followed {
self.subscribe_followed_trees(&contacts_file).await?;
}
let queue = self.queue.clone();
let syncing = self.syncing.clone();
let store = self.store.clone();
let webrtc_state = self.webrtc_state.clone();
let fetcher = self.fetcher.clone();
let max_concurrent = self.config.max_concurrent;
let mut shutdown_rx = self.shutdown_rx.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(500));
loop {
tokio::select! {
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
info!("Sync worker shutting down");
break;
}
}
_ = interval.tick() => {
let current_syncing = syncing.read().await.len();
if current_syncing >= max_concurrent {
continue;
}
let task = {
let mut q = queue.write().await;
q.pop_front()
};
if let Some(task) = task {
let hash_hex = to_hex(&task.cid.hash);
{
let mut s = syncing.write().await;
if s.contains(&hash_hex) {
continue;
}
s.insert(hash_hex.clone());
}
let syncing_clone = syncing.clone();
let store_clone = store.clone();
let webrtc_clone = webrtc_state.clone();
let fetcher_clone = fetcher.clone();
tokio::spawn(async move {
let result = fetcher_clone.fetch_tree(
&store_clone,
webrtc_clone.as_ref(),
&task.cid.hash,
).await;
match result {
Ok((chunks_fetched, bytes_fetched)) => {
if chunks_fetched > 0 {
info!(
"Synced tree {} ({} chunks, {} bytes)",
&hash_hex[..12],
chunks_fetched,
bytes_fetched
);
} else {
tracing::debug!(
"Tree {} already present locally; applying ref update",
&hash_hex[..12]
);
}
if let Err(e) = apply_synced_tree_update(&store_clone, &task) {
warn!("Failed to apply synced tree {}: {}", &hash_hex[..12], e);
}
}
Err(e) => {
warn!("Failed to sync tree {}: {}", &hash_hex[..12], e);
}
}
syncing_clone.write().await.remove(&hash_hex);
});
}
}
}
}
});
let mut notifications = self.client.notifications();
let subscriptions = self.subscriptions.clone();
let queue = self.queue.clone();
let mut pinned_refresh = tokio::time::interval(Duration::from_secs(5));
let mut shutdown_rx = self.shutdown_rx.clone();
loop {
tokio::select! {
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
info!("Background sync shutting down");
break;
}
}
_ = pinned_refresh.tick() => {
if let Err(err) = self.refresh_pinned_ref_subscriptions().await {
warn!("Failed to refresh pinned ref subscriptions: {}", err);
}
}
notification = notifications.recv() => {
match notification {
Ok(RelayPoolNotification::Event { event, .. }) => {
self.handle_tree_event(&event, &subscriptions, &queue).await;
}
Ok(_) => {}
Err(e) => {
error!("Notification error: {}", e);
break;
}
}
}
}
}
Ok(())
}
async fn refresh_pinned_ref_subscriptions(&self) -> Result<()> {
let current_refs: HashSet<String> = self.store.list_pinned_refs()?.into_iter().collect();
{
let mut pinned_refs = self.pinned_refs.write().await;
*pinned_refs = current_refs.clone();
}
{
let mut subscriptions = self.subscriptions.write().await;
subscriptions.retain(|key, sub| {
sub.priority != SyncPriority::Pinned || current_refs.contains(key)
});
}
let new_refs: Vec<String> = {
let subscribed = self.subscribed_pinned_refs.read().await;
current_refs
.iter()
.filter(|key| !subscribed.contains(*key))
.cloned()
.collect()
};
for key in new_refs {
let filter = match build_exact_tree_filter(&key) {
Ok(filter) => filter,
Err(err) => {
warn!("Ignoring invalid pinned ref {}: {}", key, err);
continue;
}
};
match self.client.subscribe(vec![filter], None).await {
Ok(_) => {
info!("Subscribed to pinned ref {}", key);
self.subscribed_pinned_refs.write().await.insert(key);
}
Err(err) => {
warn!(
"Failed to subscribe to pinned ref (will retry on refresh): {}",
err
);
}
}
}
Ok(())
}
async fn subscribe_own_trees(&self) -> Result<()> {
let filter = Filter::new()
.kind(Kind::Custom(30078))
.author(self.my_pubkey)
.custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
match self.client.subscribe(vec![filter], None).await {
Ok(_) => {
info!(
"Subscribed to own trees for {}",
self.my_pubkey.to_bech32().unwrap_or_default()
);
}
Err(e) => {
warn!(
"Failed to subscribe to own trees (will retry on reconnect): {}",
e
);
}
}
Ok(())
}
async fn subscribe_followed_trees(&self, contacts_file: &PathBuf) -> Result<()> {
let contacts: Vec<String> = if contacts_file.exists() {
let data = std::fs::read_to_string(contacts_file)?;
serde_json::from_str(&data).unwrap_or_default()
} else {
Vec::new()
};
if contacts.is_empty() {
self.followed_authors.write().await.clear();
info!("No contacts to subscribe to");
return Ok(());
}
{
let mut followed_authors = self.followed_authors.write().await;
*followed_authors = contacts.iter().cloned().collect();
}
let pubkeys: Vec<PublicKey> = contacts
.iter()
.filter_map(|hex| PublicKey::from_hex(hex).ok())
.collect();
if pubkeys.is_empty() {
return Ok(());
}
let filter = Filter::new()
.kind(Kind::Custom(30078))
.authors(pubkeys.clone())
.custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
match self.client.subscribe(vec![filter], None).await {
Ok(_) => {
info!("Subscribed to {} followed users' trees", pubkeys.len());
}
Err(e) => {
warn!(
"Failed to subscribe to followed trees (will retry on reconnect): {}",
e
);
}
}
Ok(())
}
async fn handle_tree_event(
&self,
event: &Event,
subscriptions: &Arc<RwLock<HashMap<String, TreeSubscription>>>,
queue: &Arc<RwLock<VecDeque<SyncTask>>>,
) {
let has_hashtree_tag = event.tags.iter().any(|tag| {
let v = tag.as_slice();
v.len() >= 2 && v[0] == "l" && v[1] == "hashtree"
});
if !has_hashtree_tag || event.kind != Kind::Custom(30078) {
return;
}
let d_tag = event.tags.iter().find_map(|tag| {
if let Some(TagStandard::Identifier(id)) = tag.as_standardized() {
Some(id.clone())
} else {
None
}
});
let tree_name = match d_tag {
Some(name) => name,
None => return,
};
let mut hash_hex: Option<String> = None;
let mut key_hex: Option<String> = None;
for tag in event.tags.iter() {
let tag_vec = tag.as_slice();
if tag_vec.len() >= 2 {
match tag_vec[0].as_str() {
"hash" => hash_hex = Some(tag_vec[1].clone()),
"key" => key_hex = Some(tag_vec[1].clone()),
_ => {}
}
}
}
let hash = match hash_hex.and_then(|h| from_hex(&h).ok()) {
Some(h) => h,
None => return,
};
let key = key_hex.and_then(|k| {
let bytes = hex::decode(&k).ok()?;
if bytes.len() == 32 {
let mut arr = [0u8; 32];
arr.copy_from_slice(&bytes);
Some(arr)
} else {
None
}
});
let cid = Cid { hash, key };
let npub = event
.pubkey
.to_bech32()
.unwrap_or_else(|_| event.pubkey.to_hex());
let key = format!("{}/{}", npub, tree_name);
let author_hex = event.pubkey.to_hex();
let pinned_refs = self.pinned_refs.read().await.clone();
let followed_authors = self.followed_authors.read().await.clone();
let Some(priority) = classify_sync_event(
&key,
&author_hex,
&self.my_pubkey,
&pinned_refs,
&followed_authors,
) else {
return;
};
let should_sync = {
let mut subs = subscriptions.write().await;
let sub = subs.entry(key.clone()).or_insert(TreeSubscription {
key: key.clone(),
current_cid: None,
priority,
last_synced: None,
});
let changed = sub.current_cid.as_ref().map(|c| c.hash) != Some(cid.hash);
if changed {
sub.current_cid = Some(cid.clone());
true
} else {
false
}
};
if should_sync {
info!(
"New tree update: {} -> {}",
key,
to_hex(&cid.hash)[..12].to_string()
);
let task = SyncTask {
key,
cid,
priority,
queued_at: Instant::now(),
};
let mut q = queue.write().await;
let insert_pos = q
.iter()
.position(|t| t.priority > task.priority)
.unwrap_or(q.len());
q.insert(insert_pos, task);
}
}
pub fn shutdown(&self) {
let _ = self.shutdown_tx.send(true);
}
pub async fn queue_sync(&self, key: &str, cid: Cid, priority: SyncPriority) {
let task = SyncTask {
key: key.to_string(),
cid,
priority,
queued_at: Instant::now(),
};
let mut q = self.queue.write().await;
let insert_pos = q
.iter()
.position(|t| t.priority > task.priority)
.unwrap_or(q.len());
q.insert(insert_pos, task);
}
pub async fn status(&self) -> SyncStatus {
let subscriptions = self.subscriptions.read().await;
let queue = self.queue.read().await;
let syncing = self.syncing.read().await;
SyncStatus {
subscribed_trees: subscriptions.len(),
queued_tasks: queue.len(),
active_syncs: syncing.len(),
}
}
}
#[derive(Debug, Clone)]
pub struct SyncStatus {
pub subscribed_trees: usize,
pub queued_tasks: usize,
pub active_syncs: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use nostr_sdk::Keys;
use std::fs;
use tempfile::TempDir;
fn upload_repo_root(
store: &HashtreeStore,
base: &std::path::Path,
name: &str,
body: &str,
) -> Cid {
let dir = base.join(name);
fs::create_dir_all(&dir).expect("create repo dir");
fs::write(dir.join("README.md"), body).expect("write repo file");
let cid = store
.upload_dir_with_options(&dir, true)
.expect("upload repo directory");
let cid = Cid::parse(&cid).expect("parse repo cid");
store.unpin(&cid.hash).expect("clear upload auto-pin");
cid
}
#[test]
fn classify_sync_event_ignores_removed_pinned_refs() {
let keys = Keys::generate();
let author = Keys::generate().public_key();
let key = format!("{}/repo", author.to_bech32().expect("author npub"));
let priority = classify_sync_event(
&key,
&author.to_hex(),
&keys.public_key(),
&HashSet::new(),
&HashSet::new(),
);
assert_eq!(priority, None);
}
#[test]
fn pinned_sync_update_replaces_old_root_pin() {
let temp_dir = TempDir::new().expect("temp dir");
let store = HashtreeStore::new(temp_dir.path().join("store")).expect("store");
let first_cid = upload_repo_root(&store, temp_dir.path(), "repo-v1", "version one\n");
let second_cid = upload_repo_root(&store, temp_dir.path(), "repo-v2", "version two\n");
let repo_key = format!(
"{}/repo",
Keys::generate()
.public_key()
.to_bech32()
.expect("repo owner npub")
);
let first_task = SyncTask {
key: repo_key.clone(),
cid: first_cid.clone(),
priority: SyncPriority::Pinned,
queued_at: Instant::now(),
};
apply_synced_tree_update(&store, &first_task).expect("apply first sync update");
assert!(store.is_pinned(&first_cid.hash).expect("first root pinned"));
assert_eq!(
store.get_tree_ref(&repo_key).expect("first tree ref"),
Some(first_cid.hash)
);
let second_task = SyncTask {
key: repo_key.clone(),
cid: second_cid.clone(),
priority: SyncPriority::Pinned,
queued_at: Instant::now(),
};
apply_synced_tree_update(&store, &second_task).expect("apply second sync update");
assert!(
!store
.is_pinned(&first_cid.hash)
.expect("first root pin status"),
"updating a pinned ref should unpin the superseded root"
);
assert!(store
.is_pinned(&second_cid.hash)
.expect("second root pinned"));
assert_eq!(
store.get_tree_ref(&repo_key).expect("updated tree ref"),
Some(second_cid.hash)
);
assert!(
store
.get_tree_meta(&first_cid.hash)
.expect("first meta lookup")
.is_none(),
"superseded pinned root should be unindexed after update"
);
}
}