1use anyhow::Result;
10use hashtree_core::{from_hex, to_hex, Cid};
11use nostr_sdk::prelude::*;
12use std::collections::{HashMap, HashSet, VecDeque};
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::RwLock;
17use tracing::{error, info, warn};
18
19use crate::fetch::{FetchConfig, Fetcher};
20use crate::storage::{HashtreeStore, PRIORITY_OWN, PRIORITY_FOLLOWED};
21use crate::webrtc::WebRTCState;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
25pub enum SyncPriority {
26 Own = 0,
28 Followed = 1,
30}
31
32#[derive(Debug, Clone)]
34pub struct SyncTask {
35 pub key: String,
37 pub cid: Cid,
39 pub priority: SyncPriority,
41 pub queued_at: Instant,
43}
44
45#[derive(Debug, Clone)]
47pub struct SyncConfig {
48 pub sync_own: bool,
50 pub sync_followed: bool,
52 pub blossom_servers: Vec<String>,
54 pub relays: Vec<String>,
56 pub max_concurrent: usize,
58 pub webrtc_timeout_ms: u64,
60 pub blossom_timeout_ms: u64,
62}
63
64impl Default for SyncConfig {
65 fn default() -> Self {
66 Self {
67 sync_own: true,
68 sync_followed: true,
69 blossom_servers: vec!["https://blossom.iris.to".to_string()],
70 relays: vec![
71 "wss://relay.damus.io".to_string(),
72 "wss://relay.primal.net".to_string(),
73 "wss://nos.lol".to_string(),
74 ],
75 max_concurrent: 3,
76 webrtc_timeout_ms: 2000,
77 blossom_timeout_ms: 10000,
78 }
79 }
80}
81
82#[allow(dead_code)]
84struct TreeSubscription {
85 key: String,
86 current_cid: Option<Cid>,
87 priority: SyncPriority,
88 last_synced: Option<Instant>,
89}
90
91pub struct BackgroundSync {
93 config: SyncConfig,
94 store: Arc<HashtreeStore>,
95 webrtc_state: Option<Arc<WebRTCState>>,
96 client: Client,
98 my_pubkey: PublicKey,
100 subscriptions: Arc<RwLock<HashMap<String, TreeSubscription>>>,
102 queue: Arc<RwLock<VecDeque<SyncTask>>>,
104 syncing: Arc<RwLock<HashSet<String>>>,
106 shutdown_tx: tokio::sync::watch::Sender<bool>,
108 shutdown_rx: tokio::sync::watch::Receiver<bool>,
109 fetcher: Arc<Fetcher>,
111}
112
113impl BackgroundSync {
114 pub async fn new(
116 config: SyncConfig,
117 store: Arc<HashtreeStore>,
118 keys: Keys,
119 webrtc_state: Option<Arc<WebRTCState>>,
120 ) -> Result<Self> {
121 let my_pubkey = keys.public_key();
122 let client = Client::new(keys);
123
124 for relay in &config.relays {
126 if let Err(e) = client.add_relay(relay).await {
127 warn!("Failed to add relay {}: {}", relay, e);
128 }
129 }
130
131 client.connect().await;
133
134 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
135
136 let fetch_config = FetchConfig {
138 blossom_servers: config.blossom_servers.clone(),
139 webrtc_timeout: Duration::from_millis(config.webrtc_timeout_ms),
140 blossom_timeout: Duration::from_millis(config.blossom_timeout_ms),
141 };
142 let fetcher = Arc::new(Fetcher::new(fetch_config));
143
144 Ok(Self {
145 config,
146 store,
147 webrtc_state,
148 client,
149 my_pubkey,
150 subscriptions: Arc::new(RwLock::new(HashMap::new())),
151 queue: Arc::new(RwLock::new(VecDeque::new())),
152 syncing: Arc::new(RwLock::new(HashSet::new())),
153 shutdown_tx,
154 shutdown_rx,
155 fetcher,
156 })
157 }
158
159 pub async fn run(&self, contacts_file: PathBuf) -> Result<()> {
161 info!("Starting background sync service");
162
163 tokio::time::sleep(Duration::from_secs(3)).await;
165
166 if self.config.sync_own {
168 self.subscribe_own_trees().await?;
169 }
170
171 if self.config.sync_followed {
173 self.subscribe_followed_trees(&contacts_file).await?;
174 }
175
176 let queue = self.queue.clone();
178 let syncing = self.syncing.clone();
179 let store = self.store.clone();
180 let webrtc_state = self.webrtc_state.clone();
181 let fetcher = self.fetcher.clone();
182 let max_concurrent = self.config.max_concurrent;
183 let mut shutdown_rx = self.shutdown_rx.clone();
184
185 tokio::spawn(async move {
187 let mut interval = tokio::time::interval(Duration::from_millis(500));
188
189 loop {
190 tokio::select! {
191 _ = shutdown_rx.changed() => {
192 if *shutdown_rx.borrow() {
193 info!("Sync worker shutting down");
194 break;
195 }
196 }
197 _ = interval.tick() => {
198 let current_syncing = syncing.read().await.len();
200 if current_syncing >= max_concurrent {
201 continue;
202 }
203
204 let task = {
206 let mut q = queue.write().await;
207 q.pop_front()
208 };
209
210 if let Some(task) = task {
211 let hash_hex = to_hex(&task.cid.hash);
212
213 {
215 let mut s = syncing.write().await;
216 if s.contains(&hash_hex) {
217 continue;
218 }
219 s.insert(hash_hex.clone());
220 }
221
222 let syncing_clone = syncing.clone();
224 let store_clone = store.clone();
225 let webrtc_clone = webrtc_state.clone();
226 let fetcher_clone = fetcher.clone();
227
228 tokio::spawn(async move {
229 let result = fetcher_clone.fetch_tree(
230 &store_clone,
231 webrtc_clone.as_ref(),
232 &task.cid.hash,
233 ).await;
234
235 match result {
236 Ok((chunks_fetched, bytes_fetched)) => {
237 if chunks_fetched > 0 {
238 info!(
239 "Synced tree {} ({} chunks, {} bytes)",
240 &hash_hex[..12],
241 chunks_fetched,
242 bytes_fetched
243 );
244
245 let (owner, name) = task.key.split_once('/')
248 .map(|(o, n)| (o.to_string(), Some(n)))
249 .unwrap_or((task.key.clone(), None));
250
251 let storage_priority = match task.priority {
253 SyncPriority::Own => PRIORITY_OWN,
254 SyncPriority::Followed => PRIORITY_FOLLOWED,
255 };
256
257 if let Err(e) = store_clone.index_tree(
258 &task.cid.hash,
259 &owner,
260 name,
261 storage_priority,
262 Some(&task.key), ) {
264 warn!("Failed to index tree {}: {}", &hash_hex[..12], e);
265 }
266
267 if let Err(e) = store_clone.evict_if_needed() {
269 warn!("Eviction check failed: {}", e);
270 }
271 } else {
272 tracing::debug!("Tree {} already synced", &hash_hex[..12]);
273 }
274 }
275 Err(e) => {
276 warn!("Failed to sync tree {}: {}", &hash_hex[..12], e);
277 }
278 }
279
280 syncing_clone.write().await.remove(&hash_hex);
282 });
283 }
284 }
285 }
286 }
287 });
288
289 let mut notifications = self.client.notifications();
291 let subscriptions = self.subscriptions.clone();
292 let queue = self.queue.clone();
293 let mut shutdown_rx = self.shutdown_rx.clone();
294
295 loop {
296 tokio::select! {
297 _ = shutdown_rx.changed() => {
298 if *shutdown_rx.borrow() {
299 info!("Background sync shutting down");
300 break;
301 }
302 }
303 notification = notifications.recv() => {
304 match notification {
305 Ok(RelayPoolNotification::Event { event, .. }) => {
306 self.handle_tree_event(&event, &subscriptions, &queue).await;
307 }
308 Ok(_) => {}
309 Err(e) => {
310 error!("Notification error: {}", e);
311 break;
312 }
313 }
314 }
315 }
316 }
317
318 Ok(())
319 }
320
321 async fn subscribe_own_trees(&self) -> Result<()> {
323 let filter = Filter::new()
324 .kind(Kind::Custom(30078))
325 .author(self.my_pubkey)
326 .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
327
328 match self.client.subscribe(vec![filter], None).await {
329 Ok(_) => {
330 info!(
331 "Subscribed to own trees for {}",
332 self.my_pubkey.to_bech32().unwrap_or_default()
333 );
334 }
335 Err(e) => {
336 warn!("Failed to subscribe to own trees (will retry on reconnect): {}", e);
337 }
338 }
339
340 Ok(())
341 }
342
343 async fn subscribe_followed_trees(&self, contacts_file: &PathBuf) -> Result<()> {
345 let contacts: Vec<String> = if contacts_file.exists() {
347 let data = std::fs::read_to_string(contacts_file)?;
348 serde_json::from_str(&data).unwrap_or_default()
349 } else {
350 Vec::new()
351 };
352
353 if contacts.is_empty() {
354 info!("No contacts to subscribe to");
355 return Ok(());
356 }
357
358 let pubkeys: Vec<PublicKey> = contacts
360 .iter()
361 .filter_map(|hex| PublicKey::from_hex(hex).ok())
362 .collect();
363
364 if pubkeys.is_empty() {
365 return Ok(());
366 }
367
368 let filter = Filter::new()
370 .kind(Kind::Custom(30078))
371 .authors(pubkeys.clone())
372 .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
373
374 match self.client.subscribe(vec![filter], None).await {
375 Ok(_) => {
376 info!("Subscribed to {} followed users' trees", pubkeys.len());
377 }
378 Err(e) => {
379 warn!("Failed to subscribe to followed trees (will retry on reconnect): {}", e);
380 }
381 }
382
383 Ok(())
384 }
385
386 async fn handle_tree_event(
388 &self,
389 event: &Event,
390 subscriptions: &Arc<RwLock<HashMap<String, TreeSubscription>>>,
391 queue: &Arc<RwLock<VecDeque<SyncTask>>>,
392 ) {
393 let has_hashtree_tag = event.tags.iter().any(|tag| {
395 let v = tag.as_slice();
396 v.len() >= 2 && v[0] == "l" && v[1] == "hashtree"
397 });
398
399 if !has_hashtree_tag || event.kind != Kind::Custom(30078) {
400 return;
401 }
402
403 let d_tag = event.tags.iter().find_map(|tag| {
405 if let Some(TagStandard::Identifier(id)) = tag.as_standardized() {
406 Some(id.clone())
407 } else {
408 None
409 }
410 });
411
412 let tree_name = match d_tag {
413 Some(name) => name,
414 None => return,
415 };
416
417 let mut hash_hex: Option<String> = None;
419 let mut key_hex: Option<String> = None;
420
421 for tag in event.tags.iter() {
422 let tag_vec = tag.as_slice();
423 if tag_vec.len() >= 2 {
424 match tag_vec[0].as_str() {
425 "hash" => hash_hex = Some(tag_vec[1].clone()),
426 "key" => key_hex = Some(tag_vec[1].clone()),
427 _ => {}
428 }
429 }
430 }
431
432 let hash = match hash_hex.and_then(|h| from_hex(&h).ok()) {
433 Some(h) => h,
434 None => return,
435 };
436
437 let key = key_hex.and_then(|k| {
438 let bytes = hex::decode(&k).ok()?;
439 if bytes.len() == 32 {
440 let mut arr = [0u8; 32];
441 arr.copy_from_slice(&bytes);
442 Some(arr)
443 } else {
444 None
445 }
446 });
447
448 let cid = Cid { hash, key, size: 0 };
449
450 let npub = event.pubkey.to_bech32().unwrap_or_else(|_| event.pubkey.to_hex());
452 let key = format!("{}/{}", npub, tree_name);
453
454 let priority = if event.pubkey == self.my_pubkey {
456 SyncPriority::Own
457 } else {
458 SyncPriority::Followed
459 };
460
461 let should_sync = {
463 let mut subs = subscriptions.write().await;
464 let sub = subs.entry(key.clone()).or_insert(TreeSubscription {
465 key: key.clone(),
466 current_cid: None,
467 priority,
468 last_synced: None,
469 });
470
471 let changed = sub.current_cid.as_ref().map(|c| c.hash) != Some(cid.hash);
473 if changed {
474 sub.current_cid = Some(cid.clone());
475 true
476 } else {
477 false
478 }
479 };
480
481 if should_sync {
482 info!("New tree update: {} -> {}", key, to_hex(&cid.hash)[..12].to_string());
483
484 let task = SyncTask {
486 key,
487 cid,
488 priority,
489 queued_at: Instant::now(),
490 };
491
492 let mut q = queue.write().await;
493
494 let insert_pos = q
496 .iter()
497 .position(|t| t.priority > task.priority)
498 .unwrap_or(q.len());
499 q.insert(insert_pos, task);
500 }
501 }
502
503 pub fn shutdown(&self) {
505 let _ = self.shutdown_tx.send(true);
506 }
507
508 pub async fn queue_sync(&self, key: &str, cid: Cid, priority: SyncPriority) {
510 let task = SyncTask {
511 key: key.to_string(),
512 cid,
513 priority,
514 queued_at: Instant::now(),
515 };
516
517 let mut q = self.queue.write().await;
518 let insert_pos = q
519 .iter()
520 .position(|t| t.priority > task.priority)
521 .unwrap_or(q.len());
522 q.insert(insert_pos, task);
523 }
524
525 pub async fn status(&self) -> SyncStatus {
527 let subscriptions = self.subscriptions.read().await;
528 let queue = self.queue.read().await;
529 let syncing = self.syncing.read().await;
530
531 SyncStatus {
532 subscribed_trees: subscriptions.len(),
533 queued_tasks: queue.len(),
534 active_syncs: syncing.len(),
535 }
536 }
537}
538
539#[derive(Debug, Clone)]
541pub struct SyncStatus {
542 pub subscribed_trees: usize,
543 pub queued_tasks: usize,
544 pub active_syncs: usize,
545}