1use anyhow::Result;
10use hashtree_core::{from_hex, to_hex, Cid};
11use nostr_sdk::prelude::*;
12use std::collections::{HashMap, HashSet, VecDeque};
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::RwLock;
17use tracing::{error, info, warn};
18
19use crate::fetch::{FetchConfig, Fetcher};
20use crate::storage::{HashtreeStore, PRIORITY_FOLLOWED, PRIORITY_OWN};
21use crate::webrtc::WebRTCState;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
25pub enum SyncPriority {
26 Own = 0,
28 Followed = 1,
30}
31
32#[derive(Debug, Clone)]
34pub struct SyncTask {
35 pub key: String,
37 pub cid: Cid,
39 pub priority: SyncPriority,
41 pub queued_at: Instant,
43}
44
45#[derive(Debug, Clone)]
47pub struct SyncConfig {
48 pub sync_own: bool,
50 pub sync_followed: bool,
52 pub relays: Vec<String>,
54 pub max_concurrent: usize,
56 pub webrtc_timeout_ms: u64,
58 pub blossom_timeout_ms: u64,
60}
61
62impl Default for SyncConfig {
63 fn default() -> Self {
64 Self {
65 sync_own: true,
66 sync_followed: true,
67 relays: hashtree_config::DEFAULT_RELAYS
68 .iter()
69 .map(|s| s.to_string())
70 .collect(),
71 max_concurrent: 3,
72 webrtc_timeout_ms: 2000,
73 blossom_timeout_ms: 10000,
74 }
75 }
76}
77
78impl SyncConfig {
79 pub fn from_config(config: &hashtree_config::Config) -> Self {
81 Self {
82 sync_own: true,
83 sync_followed: true,
84 relays: config.nostr.relays.clone(),
85 max_concurrent: 3,
86 webrtc_timeout_ms: 2000,
87 blossom_timeout_ms: 10000,
88 }
89 }
90}
91
92#[allow(dead_code)]
94struct TreeSubscription {
95 key: String,
96 current_cid: Option<Cid>,
97 priority: SyncPriority,
98 last_synced: Option<Instant>,
99}
100
101pub struct BackgroundSync {
103 config: SyncConfig,
104 store: Arc<HashtreeStore>,
105 webrtc_state: Option<Arc<WebRTCState>>,
106 client: Client,
108 my_pubkey: PublicKey,
110 subscriptions: Arc<RwLock<HashMap<String, TreeSubscription>>>,
112 queue: Arc<RwLock<VecDeque<SyncTask>>>,
114 syncing: Arc<RwLock<HashSet<String>>>,
116 shutdown_tx: tokio::sync::watch::Sender<bool>,
118 shutdown_rx: tokio::sync::watch::Receiver<bool>,
119 fetcher: Arc<Fetcher>,
121}
122
123impl BackgroundSync {
124 pub async fn new(
126 config: SyncConfig,
127 store: Arc<HashtreeStore>,
128 keys: Keys,
129 webrtc_state: Option<Arc<WebRTCState>>,
130 ) -> Result<Self> {
131 let my_pubkey = keys.public_key();
132 let client = Client::new(keys);
133
134 for relay in &config.relays {
136 if let Err(e) = client.add_relay(relay).await {
137 warn!("Failed to add relay {}: {}", relay, e);
138 }
139 }
140
141 client.connect().await;
143
144 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
145
146 let fetch_config = FetchConfig {
149 webrtc_timeout: Duration::from_millis(config.webrtc_timeout_ms),
150 blossom_timeout: Duration::from_millis(config.blossom_timeout_ms),
151 };
152 let fetcher = Arc::new(Fetcher::new(fetch_config));
153
154 Ok(Self {
155 config,
156 store,
157 webrtc_state,
158 client,
159 my_pubkey,
160 subscriptions: Arc::new(RwLock::new(HashMap::new())),
161 queue: Arc::new(RwLock::new(VecDeque::new())),
162 syncing: Arc::new(RwLock::new(HashSet::new())),
163 shutdown_tx,
164 shutdown_rx,
165 fetcher,
166 })
167 }
168
169 pub async fn run(&self, contacts_file: PathBuf) -> Result<()> {
171 info!("Starting background sync service");
172
173 tokio::time::sleep(Duration::from_secs(3)).await;
175
176 if self.config.sync_own {
178 self.subscribe_own_trees().await?;
179 }
180
181 if self.config.sync_followed {
183 self.subscribe_followed_trees(&contacts_file).await?;
184 }
185
186 let queue = self.queue.clone();
188 let syncing = self.syncing.clone();
189 let store = self.store.clone();
190 let webrtc_state = self.webrtc_state.clone();
191 let fetcher = self.fetcher.clone();
192 let max_concurrent = self.config.max_concurrent;
193 let mut shutdown_rx = self.shutdown_rx.clone();
194
195 tokio::spawn(async move {
197 let mut interval = tokio::time::interval(Duration::from_millis(500));
198
199 loop {
200 tokio::select! {
201 _ = shutdown_rx.changed() => {
202 if *shutdown_rx.borrow() {
203 info!("Sync worker shutting down");
204 break;
205 }
206 }
207 _ = interval.tick() => {
208 let current_syncing = syncing.read().await.len();
210 if current_syncing >= max_concurrent {
211 continue;
212 }
213
214 let task = {
216 let mut q = queue.write().await;
217 q.pop_front()
218 };
219
220 if let Some(task) = task {
221 let hash_hex = to_hex(&task.cid.hash);
222
223 {
225 let mut s = syncing.write().await;
226 if s.contains(&hash_hex) {
227 continue;
228 }
229 s.insert(hash_hex.clone());
230 }
231
232 let syncing_clone = syncing.clone();
234 let store_clone = store.clone();
235 let webrtc_clone = webrtc_state.clone();
236 let fetcher_clone = fetcher.clone();
237
238 tokio::spawn(async move {
239 let result = fetcher_clone.fetch_tree(
240 &store_clone,
241 webrtc_clone.as_ref(),
242 &task.cid.hash,
243 ).await;
244
245 match result {
246 Ok((chunks_fetched, bytes_fetched)) => {
247 if chunks_fetched > 0 {
248 info!(
249 "Synced tree {} ({} chunks, {} bytes)",
250 &hash_hex[..12],
251 chunks_fetched,
252 bytes_fetched
253 );
254
255 let (owner, name) = task.key.split_once('/')
258 .map(|(o, n)| (o.to_string(), Some(n)))
259 .unwrap_or((task.key.clone(), None));
260
261 let storage_priority = match task.priority {
263 SyncPriority::Own => PRIORITY_OWN,
264 SyncPriority::Followed => PRIORITY_FOLLOWED,
265 };
266
267 if let Err(e) = store_clone.index_tree(
268 &task.cid.hash,
269 &owner,
270 name,
271 storage_priority,
272 Some(&task.key), ) {
274 warn!("Failed to index tree {}: {}", &hash_hex[..12], e);
275 }
276
277 if let Err(e) = store_clone.evict_if_needed() {
279 warn!("Eviction check failed: {}", e);
280 }
281 } else {
282 tracing::debug!("Tree {} already synced", &hash_hex[..12]);
283 }
284 }
285 Err(e) => {
286 warn!("Failed to sync tree {}: {}", &hash_hex[..12], e);
287 }
288 }
289
290 syncing_clone.write().await.remove(&hash_hex);
292 });
293 }
294 }
295 }
296 }
297 });
298
299 let mut notifications = self.client.notifications();
301 let subscriptions = self.subscriptions.clone();
302 let queue = self.queue.clone();
303 let mut shutdown_rx = self.shutdown_rx.clone();
304
305 loop {
306 tokio::select! {
307 _ = shutdown_rx.changed() => {
308 if *shutdown_rx.borrow() {
309 info!("Background sync shutting down");
310 break;
311 }
312 }
313 notification = notifications.recv() => {
314 match notification {
315 Ok(RelayPoolNotification::Event { event, .. }) => {
316 self.handle_tree_event(&event, &subscriptions, &queue).await;
317 }
318 Ok(_) => {}
319 Err(e) => {
320 error!("Notification error: {}", e);
321 break;
322 }
323 }
324 }
325 }
326 }
327
328 Ok(())
329 }
330
331 async fn subscribe_own_trees(&self) -> Result<()> {
333 let filter = Filter::new()
334 .kind(Kind::Custom(30078))
335 .author(self.my_pubkey)
336 .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
337
338 match self.client.subscribe(vec![filter], None).await {
339 Ok(_) => {
340 info!(
341 "Subscribed to own trees for {}",
342 self.my_pubkey.to_bech32().unwrap_or_default()
343 );
344 }
345 Err(e) => {
346 warn!(
347 "Failed to subscribe to own trees (will retry on reconnect): {}",
348 e
349 );
350 }
351 }
352
353 Ok(())
354 }
355
356 async fn subscribe_followed_trees(&self, contacts_file: &PathBuf) -> Result<()> {
358 let contacts: Vec<String> = if contacts_file.exists() {
360 let data = std::fs::read_to_string(contacts_file)?;
361 serde_json::from_str(&data).unwrap_or_default()
362 } else {
363 Vec::new()
364 };
365
366 if contacts.is_empty() {
367 info!("No contacts to subscribe to");
368 return Ok(());
369 }
370
371 let pubkeys: Vec<PublicKey> = contacts
373 .iter()
374 .filter_map(|hex| PublicKey::from_hex(hex).ok())
375 .collect();
376
377 if pubkeys.is_empty() {
378 return Ok(());
379 }
380
381 let filter = Filter::new()
383 .kind(Kind::Custom(30078))
384 .authors(pubkeys.clone())
385 .custom_tag(SingleLetterTag::lowercase(Alphabet::L), vec!["hashtree"]);
386
387 match self.client.subscribe(vec![filter], None).await {
388 Ok(_) => {
389 info!("Subscribed to {} followed users' trees", pubkeys.len());
390 }
391 Err(e) => {
392 warn!(
393 "Failed to subscribe to followed trees (will retry on reconnect): {}",
394 e
395 );
396 }
397 }
398
399 Ok(())
400 }
401
402 async fn handle_tree_event(
404 &self,
405 event: &Event,
406 subscriptions: &Arc<RwLock<HashMap<String, TreeSubscription>>>,
407 queue: &Arc<RwLock<VecDeque<SyncTask>>>,
408 ) {
409 let has_hashtree_tag = event.tags.iter().any(|tag| {
411 let v = tag.as_slice();
412 v.len() >= 2 && v[0] == "l" && v[1] == "hashtree"
413 });
414
415 if !has_hashtree_tag || event.kind != Kind::Custom(30078) {
416 return;
417 }
418
419 let d_tag = event.tags.iter().find_map(|tag| {
421 if let Some(TagStandard::Identifier(id)) = tag.as_standardized() {
422 Some(id.clone())
423 } else {
424 None
425 }
426 });
427
428 let tree_name = match d_tag {
429 Some(name) => name,
430 None => return,
431 };
432
433 let mut hash_hex: Option<String> = None;
435 let mut key_hex: Option<String> = None;
436
437 for tag in event.tags.iter() {
438 let tag_vec = tag.as_slice();
439 if tag_vec.len() >= 2 {
440 match tag_vec[0].as_str() {
441 "hash" => hash_hex = Some(tag_vec[1].clone()),
442 "key" => key_hex = Some(tag_vec[1].clone()),
443 _ => {}
444 }
445 }
446 }
447
448 let hash = match hash_hex.and_then(|h| from_hex(&h).ok()) {
449 Some(h) => h,
450 None => return,
451 };
452
453 let key = key_hex.and_then(|k| {
454 let bytes = hex::decode(&k).ok()?;
455 if bytes.len() == 32 {
456 let mut arr = [0u8; 32];
457 arr.copy_from_slice(&bytes);
458 Some(arr)
459 } else {
460 None
461 }
462 });
463
464 let cid = Cid { hash, key };
465
466 let npub = event
468 .pubkey
469 .to_bech32()
470 .unwrap_or_else(|_| event.pubkey.to_hex());
471 let key = format!("{}/{}", npub, tree_name);
472
473 let priority = if event.pubkey == self.my_pubkey {
475 SyncPriority::Own
476 } else {
477 SyncPriority::Followed
478 };
479
480 let should_sync = {
482 let mut subs = subscriptions.write().await;
483 let sub = subs.entry(key.clone()).or_insert(TreeSubscription {
484 key: key.clone(),
485 current_cid: None,
486 priority,
487 last_synced: None,
488 });
489
490 let changed = sub.current_cid.as_ref().map(|c| c.hash) != Some(cid.hash);
492 if changed {
493 sub.current_cid = Some(cid.clone());
494 true
495 } else {
496 false
497 }
498 };
499
500 if should_sync {
501 info!(
502 "New tree update: {} -> {}",
503 key,
504 to_hex(&cid.hash)[..12].to_string()
505 );
506
507 let task = SyncTask {
509 key,
510 cid,
511 priority,
512 queued_at: Instant::now(),
513 };
514
515 let mut q = queue.write().await;
516
517 let insert_pos = q
519 .iter()
520 .position(|t| t.priority > task.priority)
521 .unwrap_or(q.len());
522 q.insert(insert_pos, task);
523 }
524 }
525
526 pub fn shutdown(&self) {
528 let _ = self.shutdown_tx.send(true);
529 }
530
531 pub async fn queue_sync(&self, key: &str, cid: Cid, priority: SyncPriority) {
533 let task = SyncTask {
534 key: key.to_string(),
535 cid,
536 priority,
537 queued_at: Instant::now(),
538 };
539
540 let mut q = self.queue.write().await;
541 let insert_pos = q
542 .iter()
543 .position(|t| t.priority > task.priority)
544 .unwrap_or(q.len());
545 q.insert(insert_pos, task);
546 }
547
548 pub async fn status(&self) -> SyncStatus {
550 let subscriptions = self.subscriptions.read().await;
551 let queue = self.queue.read().await;
552 let syncing = self.syncing.read().await;
553
554 SyncStatus {
555 subscribed_trees: subscriptions.len(),
556 queued_tasks: queue.len(),
557 active_syncs: syncing.len(),
558 }
559 }
560}
561
562#[derive(Debug, Clone)]
564pub struct SyncStatus {
565 pub subscribed_trees: usize,
566 pub queued_tasks: usize,
567 pub active_syncs: usize,
568}