1use anyhow::Result;
10use hashtree_core::{from_hex, to_hex, Cid};
11use nostr_sdk::prelude::*;
12use std::collections::{HashMap, HashSet, VecDeque};
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::RwLock;
17use tracing::{error, info, warn};
18
19use crate::fetch::{FetchConfig, Fetcher};
20use crate::storage::{HashtreeStore, PRIORITY_OWN, PRIORITY_FOLLOWED};
21use crate::webrtc::WebRTCState;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
25pub enum SyncPriority {
26 Own = 0,
28 Followed = 1,
30}
31
32#[derive(Debug, Clone)]
34pub struct SyncTask {
35 pub key: String,
37 pub cid: Cid,
39 pub priority: SyncPriority,
41 pub queued_at: Instant,
43}
44
45#[derive(Debug, Clone)]
47pub struct SyncConfig {
48 pub sync_own: bool,
50 pub sync_followed: bool,
52 pub relays: Vec<String>,
54 pub max_concurrent: usize,
56 pub webrtc_timeout_ms: u64,
58 pub blossom_timeout_ms: u64,
60}
61
62impl Default for SyncConfig {
63 fn default() -> Self {
64 Self {
65 sync_own: true,
66 sync_followed: true,
67 relays: hashtree_config::DEFAULT_RELAYS
68 .iter()
69 .map(|s| s.to_string())
70 .collect(),
71 max_concurrent: 3,
72 webrtc_timeout_ms: 2000,
73 blossom_timeout_ms: 10000,
74 }
75 }
76}
77
78impl SyncConfig {
79 pub fn from_config(config: &hashtree_config::Config) -> Self {
81 Self {
82 sync_own: true,
83 sync_followed: true,
84 relays: config.nostr.relays.clone(),
85 max_concurrent: 3,
86 webrtc_timeout_ms: 2000,
87 blossom_timeout_ms: 10000,
88 }
89 }
90}
91
92#[allow(dead_code)]
94struct TreeSubscription {
95 key: String,
96 current_cid: Option<Cid>,
97 priority: SyncPriority,
98 last_synced: Option<Instant>,
99}
100
101pub struct BackgroundSync {
103 config: SyncConfig,
104 store: Arc<HashtreeStore>,
105 webrtc_state: Option<Arc<WebRTCState>>,
106 client: Client,
108 my_pubkey: PublicKey,
110 subscriptions: Arc<RwLock<HashMap<String, TreeSubscription>>>,
112 queue: Arc<RwLock<VecDeque<SyncTask>>>,
114 syncing: Arc<RwLock<HashSet<String>>>,
116 shutdown_tx: tokio::sync::watch::Sender<bool>,
118 shutdown_rx: tokio::sync::watch::Receiver<bool>,
119 fetcher: Arc<Fetcher>,
121}
122
123impl BackgroundSync {
124 pub async fn new(
126 config: SyncConfig,
127 store: Arc<HashtreeStore>,
128 keys: Keys,
129 webrtc_state: Option<Arc<WebRTCState>>,
130 ) -> Result<Self> {
131 let my_pubkey = keys.public_key();
132 let client = Client::new(keys);
133
134 for relay in &config.relays {
136 if let Err(e) = client.add_relay(relay).await {
137 warn!("Failed to add relay {}: {}", relay, e);
138 }
139 }
140
141 client.connect().await;
143
144 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
145
146 let fetch_config = FetchConfig {
149 webrtc_timeout: Duration::from_millis(config.webrtc_timeout_ms),
150 blossom_timeout: Duration::from_millis(config.blossom_timeout_ms),
151 };
152 let fetcher = Arc::new(Fetcher::new(fetch_config));
153
154 Ok(Self {
155 config,
156 store,
157 webrtc_state,
158 client,
159 my_pubkey,
160 subscriptions: Arc::new(RwLock::new(HashMap::new())),
161 queue: Arc::new(RwLock::new(VecDeque::new())),
162 syncing: Arc::new(RwLock::new(HashSet::new())),
163 shutdown_tx,
164 shutdown_rx,
165 fetcher,
166 })
167 }
168
169 pub async fn run(&self, contacts_file: PathBuf) -> Result<()> {
171 info!("Starting background sync service");
172
173 tokio::time::sleep(Duration::from_secs(3)).await;
175
176 if self.config.sync_own {
178 self.subscribe_own_trees().await?;
179 }
180
181 if self.config.sync_followed {
183 self.subscribe_followed_trees(&contacts_file).await?;
184 }
185
186 let queue = self.queue.clone();
188 let syncing = self.syncing.clone();
189 let store = self.store.clone();
190 let webrtc_state = self.webrtc_state.clone();
191 let fetcher = self.fetcher.clone();
192 let max_concurrent = self.config.max_concurrent;
193 let mut shutdown_rx = self.shutdown_rx.clone();
194
195 tokio::spawn(async move {
197 let mut interval = tokio::time::interval(Duration::from_millis(500));
198
199 loop {
200 tokio::select! {
201 _ = shutdown_rx.changed() => {
202 if *shutdown_rx.borrow() {
203 info!("Sync worker shutting down");
204 break;
205 }
206 }
207 _ = interval.tick() => {
208 let current_syncing = syncing.read().await.len();
210 if current_syncing >= max_concurrent {
211 continue;
212 }
213
214 let task = {
216 let mut q = queue.write().await;
217 q.pop_front()
218 };
219
220 if let Some(task) = task {
221 let hash_hex = to_hex(&task.cid.hash);
222
223 {
225 let mut s = syncing.write().await;
226 if s.contains(&hash_hex) {
227 continue;
228 }
229 s.insert(hash_hex.clone());
230 }
231
232 let syncing_clone = syncing.clone();
234 let store_clone = store.clone();
235 let webrtc_clone = webrtc_state.clone();
236 let fetcher_clone = fetcher.clone();
237
238 tokio::spawn(async move {
239 let result = fetcher_clone.fetch_tree(
240 &store_clone,
241 webrtc_clone.as_ref(),
242 &task.cid.hash,
243 ).await;
244
245 match result {
246 Ok((chunks_fetched, bytes_fetched)) => {
247 if chunks_fetched > 0 {
248 info!(
249 "Synced tree {} ({} chunks, {} bytes)",
250 &hash_hex[..12],
251 chunks_fetched,
252 bytes_fetched
253 );
254
255 let (owner, name) = task.key.split_once('/')
258 .map(|(o, n)| (o.to_string(), Some(n)))
259 .unwrap_or((task.key.clone(), None));
260
261 let storage_priority = match task.priority {
263 SyncPriority::Own => PRIORITY_OWN,
264 SyncPriority::Followed => PRIORITY_FOLLOWED,
265 };
266
267 if let Err(e) = store_clone.index_tree(
268 &task.cid.hash,
269 &owner,
270 name,
271 storage_priority,
272 Some(&task.key), ) {
274 warn!("Failed to index tree {}: {}", &hash_hex[..12], e);
275 }
276
277 if let Err(e) = store_clone.evict_if_needed() {
279 warn!("Eviction check failed: {}", e);
280 }
281 } else {
282 tracing::debug!("Tree {} already synced", &hash_hex[..12]);
283 }
284 }
285 Err(e) => {
286 warn!("Failed to sync tree {}: {}", &hash_hex[..12], e);
287 }
288 }
289
290 syncing_clone.write().await.remove(&hash_hex);
292 });
293 }
294 }
295 }
296 }
297 });
298
299 let mut notifications = self.client.notifications();
301 let subscriptions = self.subscriptions.clone();
302 let queue = self.queue.clone();
303 let mut shutdown_rx = self.shutdown_rx.clone();
304
305 loop {
306 tokio::select! {
307 _ = shutdown_rx.changed() => {
308 if *shutdown_rx.borrow() {
309 info!("Background sync shutting down");
310 break;
311 }
312 }
313 notification = notifications.recv() => {
314 match notification {
315 Ok(RelayPoolNotification::Event { event, .. }) => {
316 self.handle_tree_event(&event, &subscriptions, &queue).await;
317 }
318 Ok(_) => {}
319 Err(e) => {
320 error!("Notification error: {}", e);
321 break;
322 }
323 }
324 }
325 }
326 }
327
328 Ok(())
329 }
330
331 async fn subscribe_own_trees(&self) -> Result<()> {
333 let filter = Filter::new()
334 .kind(Kind::Custom(30078))
335 .author(self.my_pubkey)
336 .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
337
338 match self.client.subscribe(vec![filter], None).await {
339 Ok(_) => {
340 info!(
341 "Subscribed to own trees for {}",
342 self.my_pubkey.to_bech32().unwrap_or_default()
343 );
344 }
345 Err(e) => {
346 warn!("Failed to subscribe to own trees (will retry on reconnect): {}", e);
347 }
348 }
349
350 Ok(())
351 }
352
353 async fn subscribe_followed_trees(&self, contacts_file: &PathBuf) -> Result<()> {
355 let contacts: Vec<String> = if contacts_file.exists() {
357 let data = std::fs::read_to_string(contacts_file)?;
358 serde_json::from_str(&data).unwrap_or_default()
359 } else {
360 Vec::new()
361 };
362
363 if contacts.is_empty() {
364 info!("No contacts to subscribe to");
365 return Ok(());
366 }
367
368 let pubkeys: Vec<PublicKey> = contacts
370 .iter()
371 .filter_map(|hex| PublicKey::from_hex(hex).ok())
372 .collect();
373
374 if pubkeys.is_empty() {
375 return Ok(());
376 }
377
378 let filter = Filter::new()
380 .kind(Kind::Custom(30078))
381 .authors(pubkeys.clone())
382 .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
383
384 match self.client.subscribe(vec![filter], None).await {
385 Ok(_) => {
386 info!("Subscribed to {} followed users' trees", pubkeys.len());
387 }
388 Err(e) => {
389 warn!("Failed to subscribe to followed trees (will retry on reconnect): {}", e);
390 }
391 }
392
393 Ok(())
394 }
395
396 async fn handle_tree_event(
398 &self,
399 event: &Event,
400 subscriptions: &Arc<RwLock<HashMap<String, TreeSubscription>>>,
401 queue: &Arc<RwLock<VecDeque<SyncTask>>>,
402 ) {
403 let has_hashtree_tag = event.tags.iter().any(|tag| {
405 let v = tag.as_slice();
406 v.len() >= 2 && v[0] == "l" && v[1] == "hashtree"
407 });
408
409 if !has_hashtree_tag || event.kind != Kind::Custom(30078) {
410 return;
411 }
412
413 let d_tag = event.tags.iter().find_map(|tag| {
415 if let Some(TagStandard::Identifier(id)) = tag.as_standardized() {
416 Some(id.clone())
417 } else {
418 None
419 }
420 });
421
422 let tree_name = match d_tag {
423 Some(name) => name,
424 None => return,
425 };
426
427 let mut hash_hex: Option<String> = None;
429 let mut key_hex: Option<String> = None;
430
431 for tag in event.tags.iter() {
432 let tag_vec = tag.as_slice();
433 if tag_vec.len() >= 2 {
434 match tag_vec[0].as_str() {
435 "hash" => hash_hex = Some(tag_vec[1].clone()),
436 "key" => key_hex = Some(tag_vec[1].clone()),
437 _ => {}
438 }
439 }
440 }
441
442 let hash = match hash_hex.and_then(|h| from_hex(&h).ok()) {
443 Some(h) => h,
444 None => return,
445 };
446
447 let key = key_hex.and_then(|k| {
448 let bytes = hex::decode(&k).ok()?;
449 if bytes.len() == 32 {
450 let mut arr = [0u8; 32];
451 arr.copy_from_slice(&bytes);
452 Some(arr)
453 } else {
454 None
455 }
456 });
457
458 let cid = Cid { hash, key };
459
460 let npub = event.pubkey.to_bech32().unwrap_or_else(|_| event.pubkey.to_hex());
462 let key = format!("{}/{}", npub, tree_name);
463
464 let priority = if event.pubkey == self.my_pubkey {
466 SyncPriority::Own
467 } else {
468 SyncPriority::Followed
469 };
470
471 let should_sync = {
473 let mut subs = subscriptions.write().await;
474 let sub = subs.entry(key.clone()).or_insert(TreeSubscription {
475 key: key.clone(),
476 current_cid: None,
477 priority,
478 last_synced: None,
479 });
480
481 let changed = sub.current_cid.as_ref().map(|c| c.hash) != Some(cid.hash);
483 if changed {
484 sub.current_cid = Some(cid.clone());
485 true
486 } else {
487 false
488 }
489 };
490
491 if should_sync {
492 info!("New tree update: {} -> {}", key, to_hex(&cid.hash)[..12].to_string());
493
494 let task = SyncTask {
496 key,
497 cid,
498 priority,
499 queued_at: Instant::now(),
500 };
501
502 let mut q = queue.write().await;
503
504 let insert_pos = q
506 .iter()
507 .position(|t| t.priority > task.priority)
508 .unwrap_or(q.len());
509 q.insert(insert_pos, task);
510 }
511 }
512
513 pub fn shutdown(&self) {
515 let _ = self.shutdown_tx.send(true);
516 }
517
518 pub async fn queue_sync(&self, key: &str, cid: Cid, priority: SyncPriority) {
520 let task = SyncTask {
521 key: key.to_string(),
522 cid,
523 priority,
524 queued_at: Instant::now(),
525 };
526
527 let mut q = self.queue.write().await;
528 let insert_pos = q
529 .iter()
530 .position(|t| t.priority > task.priority)
531 .unwrap_or(q.len());
532 q.insert(insert_pos, task);
533 }
534
535 pub async fn status(&self) -> SyncStatus {
537 let subscriptions = self.subscriptions.read().await;
538 let queue = self.queue.read().await;
539 let syncing = self.syncing.read().await;
540
541 SyncStatus {
542 subscribed_trees: subscriptions.len(),
543 queued_tasks: queue.len(),
544 active_syncs: syncing.len(),
545 }
546 }
547}
548
549#[derive(Debug, Clone)]
551pub struct SyncStatus {
552 pub subscribed_trees: usize,
553 pub queued_tasks: usize,
554 pub active_syncs: usize,
555}