Skip to main content

statsai_daemon/
lib.rs

1//! Loopback API + file-watching daemon for `statsai`.
2
3use anyhow::{bail, Context, Result};
4use serde_json::json;
5use statsai_core::{
6    SyncAck, SyncBatch, SyncEntityCounts, SyncRejectedRecord, SYNC_ACK_SCHEMA_VERSION,
7    SYNC_BATCH_SCHEMA_VERSION,
8};
9use statsai_store::Store;
10use std::net::ToSocketAddrs;
11use std::sync::{Arc, Mutex, MutexGuard};
12use tiny_http::{Header, Method, Request, Response, Server, StatusCode};
13
14fn lock_store(store: &Arc<Mutex<Store>>) -> MutexGuard<'_, Store> {
15    store.lock().unwrap_or_else(|e| e.into_inner())
16}
17
18pub fn run(addr: &str, store: Arc<Mutex<Store>>) -> Result<()> {
19    ensure_loopback(addr)?;
20    let server =
21        Server::http(addr).map_err(|err| anyhow::anyhow!("start local API on {addr}: {err}"))?;
22
23    for request in server.incoming_requests() {
24        handle_request(request, &store)?;
25    }
26
27    Ok(())
28}
29
30fn handle_request(mut request: Request, store: &Arc<Mutex<Store>>) -> Result<()> {
31    let method = request.method().clone();
32    let url = request.url().to_string();
33
34    if method == Method::Post && url == "/v1/sync/batches" {
35        let mut body = String::new();
36        request
37            .as_reader()
38            .read_to_string(&mut body)
39            .context("read sync batch request")?;
40        let batch: SyncBatch = match serde_json::from_str(&body) {
41            Ok(batch) => batch,
42            Err(error) => {
43                return respond_text(request, StatusCode(400), &format!("invalid batch: {error}"));
44            }
45        };
46        let ack = {
47            let s = lock_store(store);
48            match ingest_sync_batch(&s, &batch) {
49                Ok(ack) => ack,
50                Err(error) => {
51                    return respond_text(request, StatusCode(400), &error.to_string());
52                }
53            }
54        };
55        return respond_json(request, StatusCode(200), &ack);
56    }
57
58    if method != Method::Get {
59        return respond_text(request, StatusCode(405), "method not allowed");
60    }
61
62    let s = lock_store(store);
63    let payload = match url.as_str() {
64        "/health" => json!({"status": "ok"}),
65        "/status" => json!({
66            "events": s.event_count()?,
67            "tokens": s.token_total()?
68        }),
69        "/sources" => serde_json::to_value(s.list_sources()?)?,
70        "/accounts" => serde_json::to_value(s.list_accounts()?)?,
71        "/source-account-assignments" => {
72            serde_json::to_value(s.list_source_account_assignments()?)?
73        }
74        "/subscriptions" => serde_json::to_value(s.list_subscriptions()?)?,
75        "/reports/weekly" => json!({
76            "events": s.event_count()?,
77            "tokens": s.token_total()?
78        }),
79        _ => {
80            drop(s);
81            return respond_text(request, StatusCode(404), "not found");
82        }
83    };
84    drop(s);
85
86    respond_json(request, StatusCode(200), &payload)
87}
88
89pub fn ingest_sync_batch(store: &Store, batch: &SyncBatch) -> Result<SyncAck> {
90    if batch.schema_version != SYNC_BATCH_SCHEMA_VERSION {
91        bail!("unsupported sync batch schema {}", batch.schema_version);
92    }
93
94    for source in &batch.sources {
95        store.upsert_source(source)?;
96    }
97    for account in &batch.accounts {
98        store.upsert_account(account)?;
99    }
100    for assignment in &batch.source_account_assignments {
101        store.upsert_source_account_assignment(assignment)?;
102    }
103    for subscription in &batch.subscriptions {
104        store.upsert_subscription(subscription)?;
105    }
106    let inserted_events = store.insert_events(&batch.events)?;
107    let written_summaries = store.upsert_summaries(&batch.summaries)?;
108
109    Ok(SyncAck {
110        schema_version: SYNC_ACK_SCHEMA_VERSION.to_string(),
111        batch_id: batch.batch_id.clone(),
112        accepted: SyncEntityCounts {
113            sources: batch.sources.len() as u64,
114            accounts: batch.accounts.len() as u64,
115            source_account_assignments: batch.source_account_assignments.len() as u64,
116            subscriptions: batch.subscriptions.len() as u64,
117            events: inserted_events,
118            summaries: written_summaries,
119        },
120        duplicates: SyncEntityCounts {
121            sources: 0,
122            accounts: 0,
123            source_account_assignments: 0,
124            subscriptions: 0,
125            events: (batch.events.len() as u64).saturating_sub(inserted_events),
126            summaries: 0,
127        },
128        rejected: Vec::<SyncRejectedRecord>::new(),
129    })
130}
131
132fn respond_json<T: serde::Serialize>(
133    request: Request,
134    status: StatusCode,
135    payload: &T,
136) -> Result<()> {
137    let body = serde_json::to_string_pretty(payload)?;
138    let response = Response::from_string(body)
139        .with_status_code(status)
140        .with_header(content_type_json());
141    request.respond(response)?;
142    Ok(())
143}
144
145fn respond_text(request: Request, status: StatusCode, body: &str) -> Result<()> {
146    let response = Response::from_string(body).with_status_code(status);
147    request.respond(response)?;
148    Ok(())
149}
150
151fn content_type_json() -> Header {
152    Header::from_bytes("content-type", "application/json").expect("static header is valid")
153}
154
155#[cfg(feature = "watch")]
156mod watch {
157    use anyhow::{Context, Result};
158    use chrono::{DateTime, Utc};
159    use notify::{Event, EventKind, RecursiveMode, Watcher};
160    use statsai_adapters::{default_adapters, ProviderAdapter, ScanCandidateFile, ScanOptions};
161    use statsai_core::{
162        timestamp_in_period, IdentitySource, ProviderAccountId, SourceAccountAssignment,
163        SourceKind, SourceLocation, SourceVerificationMode, UsageEvent, UsageSummary,
164    };
165    use statsai_store::{
166        effective_verified_source_state_is_missing, has_active_verified_source_assignment,
167        reconcile_verified_source_state, verified_source_state_hash, ScanFileStateEntry, Store,
168    };
169    use std::collections::HashSet;
170    use std::path::PathBuf;
171    use std::sync::mpsc;
172    use std::sync::{Arc, Mutex};
173    use std::time::Duration;
174    use tiny_http::Server;
175
176    pub fn watch_and_serve(addr: &str, store: Arc<Mutex<Store>>, device_id: &str) -> Result<()> {
177        super::ensure_loopback(addr)?;
178
179        let sources = {
180            let s = super::lock_store(&store);
181            discover_watch_sources(&s)
182        };
183        let (tx, rx) = mpsc::channel();
184
185        let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
186            if let Ok(event) = res {
187                if matches!(event.kind, EventKind::Create(_) | EventKind::Modify(_)) {
188                    let changed: Vec<PathBuf> = event.paths;
189                    let _ = tx.send(changed);
190                }
191            }
192        })
193        .context("create file watcher")?;
194
195        for path in &sources {
196            if let Err(e) = watcher.watch(path, RecursiveMode::Recursive) {
197                eprintln!("daemon: cannot watch {}: {e}", path.display());
198            } else {
199                eprintln!("daemon: watching {}", path.display());
200            }
201        }
202
203        eprintln!("daemon: API listening on http://{addr}");
204        let server = Server::http(addr)
205            .map_err(|err| anyhow::anyhow!("start local API on {addr}: {err}"))?;
206
207        loop {
208            match rx.recv_timeout(Duration::from_millis(250)) {
209                Ok(changed) => {
210                    let s = super::lock_store(&store);
211                    rescan_changed_sources(&s, device_id, &changed);
212                }
213                Err(mpsc::RecvTimeoutError::Timeout) => {}
214                Err(mpsc::RecvTimeoutError::Disconnected) => break,
215            }
216
217            if let Ok(Some(request)) = server.try_recv() {
218                super::handle_request(request, &store)?;
219            }
220        }
221
222        Ok(())
223    }
224
225    fn discover_watch_sources(store: &Store) -> Vec<PathBuf> {
226        let mut paths = Vec::new();
227
228        if let Ok(configured) = store.list_sources() {
229            for source in configured {
230                if source.source_kind != SourceKind::LocalAdapter {
231                    continue;
232                }
233                if let Some(label) = source.path_label.as_deref().filter(|p| !p.is_empty()) {
234                    let path = PathBuf::from(label);
235                    if path.is_dir() && !paths.contains(&path) {
236                        paths.push(path);
237                    }
238                }
239            }
240        }
241
242        for adapter in default_adapters() {
243            for source in adapter.discover() {
244                if source.source_kind != SourceKind::LocalAdapter {
245                    continue;
246                }
247                if let Some(label) = source.path_label.as_deref().filter(|p| !p.is_empty()) {
248                    let path = PathBuf::from(label);
249                    if path.is_dir() && !paths.contains(&path) {
250                        paths.push(path);
251                    }
252                }
253            }
254        }
255
256        paths
257    }
258
259    fn rescan_changed_sources(store: &Store, device_id: &str, changed: &[PathBuf]) {
260        let adapters: Vec<Box<dyn ProviderAdapter>> = default_adapters();
261        rescan_changed_sources_with_adapters(store, device_id, changed, &adapters);
262    }
263
264    fn rescan_changed_sources_with_adapters(
265        store: &Store,
266        device_id: &str,
267        changed: &[PathBuf],
268        adapters: &[Box<dyn ProviderAdapter>],
269    ) {
270        let configured = match store.list_sources() {
271            Ok(sources) => sources,
272            Err(e) => {
273                eprintln!("daemon: failed to list sources: {e}");
274                return;
275            }
276        };
277
278        for adapter in adapters {
279            let sources = scan_sources_for_paths(adapter.as_ref(), &configured, changed);
280            for mut source in sources {
281                let cache_candidates = match adapter.scan_candidates(&source) {
282                    Ok(candidates) => candidates,
283                    Err(e) => {
284                        eprintln!(
285                            "daemon: scan candidate discovery failed for {}: {e}",
286                            source.path_label.as_deref().unwrap_or("unknown")
287                        );
288                        continue;
289                    }
290                };
291                let file_cache_entries = scan_file_state_entries(&cache_candidates);
292                let pending_file_entries =
293                    match store.pending_scan_file_entries(&source.source_id, &file_cache_entries) {
294                        Ok(entries) => entries,
295                        Err(e) => {
296                            eprintln!(
297                                "daemon: scan cache lookup failed for {}: {e}",
298                                source.path_label.as_deref().unwrap_or("unknown")
299                            );
300                            continue;
301                        }
302                    };
303                let verification_mode = source.verification_mode.clone();
304                let probed_verified_source_state =
305                    if matches!(verification_mode, SourceVerificationMode::Disabled) {
306                        None
307                    } else {
308                        match adapter.probe_verified_source_state(&source) {
309                            Ok(state) => state,
310                            Err(e) => {
311                                eprintln!(
312                                    "daemon: verified auth probe failed for {}: {e}",
313                                    source.path_label.as_deref().unwrap_or("unknown")
314                                );
315                                continue;
316                            }
317                        }
318                    };
319                let next_verified_state_hash =
320                    if matches!(verification_mode, SourceVerificationMode::Auto) {
321                        match verified_source_state_hash(probed_verified_source_state.as_ref()) {
322                            Ok(hash) => hash,
323                            Err(e) => {
324                                eprintln!(
325                                    "daemon: verified auth hash failed for {}: {e}",
326                                    source.path_label.as_deref().unwrap_or("unknown")
327                                );
328                                continue;
329                            }
330                        }
331                    } else {
332                        None
333                    };
334                let verified_state_changed =
335                    matches!(verification_mode, SourceVerificationMode::Auto)
336                        && source.verified_state_hash != next_verified_state_hash;
337                let legacy_verified_state_needs_reconciliation =
338                    matches!(verification_mode, SourceVerificationMode::Auto)
339                        && source.verified_state_hash.is_none()
340                        && next_verified_state_hash.is_none()
341                        && effective_verified_source_state_is_missing(
342                            &probed_verified_source_state,
343                        )
344                        && match has_active_verified_source_assignment(store, &source.source_id) {
345                            Ok(active) => active,
346                            Err(e) => {
347                                eprintln!(
348                                    "daemon: verified assignment lookup failed for {}: {e}",
349                                    source.path_label.as_deref().unwrap_or("unknown")
350                                );
351                                continue;
352                            }
353                        };
354                if pending_file_entries.is_empty()
355                    && !verified_state_changed
356                    && !legacy_verified_state_needs_reconciliation
357                {
358                    continue;
359                }
360                let options = ScanOptions {
361                    device_id: device_id.to_string(),
362                    selected_cache_keys: Some(
363                        pending_file_entries
364                            .iter()
365                            .map(|entry| entry.cache_key.clone())
366                            .collect::<HashSet<_>>(),
367                    ),
368                };
369                let scan_result = if pending_file_entries.is_empty() {
370                    Ok(statsai_adapters::AdapterScan::default())
371                } else {
372                    adapter.scan(&source, &options)
373                };
374                match scan_result {
375                    Ok(mut scan) => {
376                        let parsed_events = scan.events.len();
377                        let parsed_summaries = scan.summaries.len();
378                        let effective_verified_source_state =
379                            if matches!(verification_mode, SourceVerificationMode::Disabled) {
380                                None
381                            } else if pending_file_entries.is_empty() {
382                                probed_verified_source_state
383                            } else {
384                                scan.verified_source_state
385                                    .take()
386                                    .or(probed_verified_source_state)
387                            };
388                        if let Err(e) = reconcile_verified_source_state(
389                            store,
390                            &mut source,
391                            effective_verified_source_state.as_ref(),
392                            next_verified_state_hash,
393                        ) {
394                            eprintln!("daemon: verified auth reconciliation failed: {e}");
395                            continue;
396                        }
397                        if let Err(e) = store.upsert_source(&source) {
398                            eprintln!("daemon: update source verified auth state failed: {e}");
399                            continue;
400                        }
401                        if pending_file_entries.is_empty() {
402                            eprintln!(
403                                "daemon: reconciled auth state for {} ({})",
404                                source.provider,
405                                source.path_label.as_deref().unwrap_or("unknown")
406                            );
407                            continue;
408                        }
409                        if let Err(e) = apply_source_account_resolution(
410                            store,
411                            &source,
412                            &mut scan.events,
413                            &mut scan.summaries,
414                        ) {
415                            eprintln!("daemon: account resolution failed: {e}");
416                            continue;
417                        }
418                        let inserted_events = match store.insert_events(&scan.events) {
419                            Ok(count) => count,
420                            Err(e) => {
421                                eprintln!("daemon: insert events failed: {e}");
422                                continue;
423                            }
424                        };
425                        let written_summaries = match store.upsert_summaries(&scan.summaries) {
426                            Ok(count) => count,
427                            Err(e) => {
428                                eprintln!("daemon: insert summaries failed: {e}");
429                                continue;
430                            }
431                        };
432                        if let Err(e) =
433                            store.record_scan_file_entries(&source.source_id, &pending_file_entries)
434                        {
435                            eprintln!("daemon: update scan cache failed: {e}");
436                            continue;
437                        }
438                        eprintln!(
439                            "daemon: rescanned {} ({}) — files={}, cached={}, parsed_events={}, inserted_events={}, parsed_summaries={}, summaries_written={}",
440                            source.provider,
441                            source.path_label.as_deref().unwrap_or("unknown"),
442                            scan.diagnostics.files_scanned,
443                            scan.diagnostics.files_skipped_unchanged,
444                            parsed_events,
445                            inserted_events,
446                            parsed_summaries,
447                            written_summaries
448                        );
449                    }
450                    Err(e) => {
451                        eprintln!(
452                            "daemon: scan failed for {}: {e}",
453                            source.path_label.as_deref().unwrap_or("unknown")
454                        );
455                    }
456                }
457            }
458        }
459    }
460
461    fn scan_sources_for_paths(
462        adapter: &dyn ProviderAdapter,
463        configured: &[SourceLocation],
464        changed: &[PathBuf],
465    ) -> Vec<SourceLocation> {
466        let mut sources = Vec::new();
467        for source in configured
468            .iter()
469            .filter(|s| {
470                s.enabled
471                    && s.source_kind == SourceKind::LocalAdapter
472                    && s.provider == adapter.provider()
473            })
474            .cloned()
475        {
476            if source.path_label.is_some() && source_in_changed_paths(&source, changed) {
477                sources.push(source);
478            }
479        }
480        for source in adapter.discover() {
481            if source.source_kind != SourceKind::LocalAdapter || source.path_label.is_none() {
482                continue;
483            }
484            if source_in_changed_paths(&source, changed)
485                && !sources.iter().any(|s| s.source_id == source.source_id)
486            {
487                sources.push(source);
488            }
489        }
490        sources
491    }
492
493    fn source_in_changed_paths(source: &SourceLocation, changed: &[PathBuf]) -> bool {
494        let Some(label) = source.path_label.as_deref() else {
495            return false;
496        };
497        let source_path = PathBuf::from(label);
498        changed.iter().any(|changed_path| {
499            changed_path.starts_with(&source_path) || source_path.starts_with(changed_path)
500        })
501    }
502
503    fn scan_file_state_entries(candidates: &[ScanCandidateFile]) -> Vec<ScanFileStateEntry> {
504        candidates
505            .iter()
506            .map(|candidate| ScanFileStateEntry {
507                cache_key: candidate.cache_key.clone(),
508                cache_signature: candidate.cache_signature.clone(),
509            })
510            .collect()
511    }
512
513    fn apply_source_account_resolution(
514        store: &Store,
515        source: &SourceLocation,
516        events: &mut [UsageEvent],
517        summaries: &mut [UsageSummary],
518    ) -> Result<()> {
519        let assignments = store.list_source_account_assignments_for_source(&source.source_id)?;
520        for event in events {
521            apply_account_resolution_to_event(&assignments, event);
522        }
523        for summary in summaries {
524            apply_account_resolution_to_summary(&assignments, summary);
525        }
526        Ok(())
527    }
528
529    fn apply_account_resolution_to_event(
530        assignments: &[SourceAccountAssignment],
531        event: &mut UsageEvent,
532    ) {
533        if keep_detected_account_identity(
534            event.provider_account_id.as_ref(),
535            event
536                .parse_evidence
537                .as_ref()
538                .map(|evidence| &evidence.account_identity_source),
539        ) {
540            return;
541        }
542        let assignment = assignment_for_timestamp(assignments, event.session.started_at);
543        if let Some(assignment) = assignment {
544            event.provider_account_id = Some(assignment.provider_account_id.clone());
545            if let Some(evidence) = event.parse_evidence.as_mut() {
546                evidence.account_identity_source = IdentitySource::SourceConfig;
547            }
548        } else if should_clear_resolved_account(
549            event.provider_account_id.as_ref(),
550            event
551                .parse_evidence
552                .as_ref()
553                .map(|evidence| &evidence.account_identity_source),
554        ) {
555            event.provider_account_id = None;
556            if let Some(evidence) = event.parse_evidence.as_mut() {
557                evidence.account_identity_source = IdentitySource::Unresolved;
558            }
559        }
560    }
561
562    fn apply_account_resolution_to_summary(
563        assignments: &[SourceAccountAssignment],
564        summary: &mut UsageSummary,
565    ) {
566        if keep_detected_account_identity(
567            summary.provider_account_id.as_ref(),
568            summary
569                .parse_evidence
570                .as_ref()
571                .map(|evidence| &evidence.account_identity_source),
572        ) {
573            return;
574        }
575        let timestamp = summary.period_start.unwrap_or(summary.observed_at);
576        let assignment = assignment_for_timestamp(assignments, timestamp);
577        if let Some(assignment) = assignment {
578            summary.provider_account_id = Some(assignment.provider_account_id.clone());
579            if let Some(evidence) = summary.parse_evidence.as_mut() {
580                evidence.account_identity_source = IdentitySource::SourceConfig;
581            }
582        } else if should_clear_resolved_account(
583            summary.provider_account_id.as_ref(),
584            summary
585                .parse_evidence
586                .as_ref()
587                .map(|evidence| &evidence.account_identity_source),
588        ) {
589            summary.provider_account_id = None;
590            if let Some(evidence) = summary.parse_evidence.as_mut() {
591                evidence.account_identity_source = IdentitySource::Unresolved;
592            }
593        }
594    }
595
596    fn keep_detected_account_identity(
597        provider_account_id: Option<&ProviderAccountId>,
598        identity_source: Option<&IdentitySource>,
599    ) -> bool {
600        let Some(provider_account_id) = provider_account_id else {
601            return false;
602        };
603        if provider_account_id.0.trim().is_empty() {
604            return false;
605        }
606        let Some(identity_source) = identity_source else {
607            return false;
608        };
609        !matches!(
610            identity_source,
611            IdentitySource::SourceConfig
612                | IdentitySource::UserConfigured
613                | IdentitySource::ManualHint
614                | IdentitySource::Unknown
615                | IdentitySource::Unresolved
616        )
617    }
618
619    fn should_clear_resolved_account(
620        provider_account_id: Option<&ProviderAccountId>,
621        identity_source: Option<&IdentitySource>,
622    ) -> bool {
623        let Some(provider_account_id) = provider_account_id else {
624            return false;
625        };
626        if provider_account_id.0.trim().is_empty() {
627            return false;
628        }
629        matches!(
630            identity_source,
631            None | Some(
632                IdentitySource::SourceConfig
633                    | IdentitySource::UserConfigured
634                    | IdentitySource::ManualHint
635                    | IdentitySource::Unknown
636                    | IdentitySource::Unresolved
637            )
638        )
639    }
640
641    fn assignment_for_timestamp(
642        assignments: &[SourceAccountAssignment],
643        timestamp: DateTime<Utc>,
644    ) -> Option<&SourceAccountAssignment> {
645        assignments
646            .iter()
647            .filter(|assignment| {
648                timestamp_in_period(timestamp, assignment.started_at, assignment.ended_at)
649            })
650            .max_by(|left, right| left.started_at.cmp(&right.started_at))
651    }
652
653    #[cfg(test)]
654    mod tests {
655        use super::*;
656        use chrono::TimeZone;
657        use statsai_core::{
658            BillingPeriod, LocationOrigin, SubscriptionStatus, VerifiedSourceState,
659            VerifiedSubscriptionState,
660        };
661        use std::sync::{Arc, Mutex};
662
663        struct TestAdapter {
664            provider: &'static str,
665            verified_state: Option<VerifiedSourceState>,
666            scan_calls: Arc<Mutex<u64>>,
667        }
668
669        impl ProviderAdapter for TestAdapter {
670            fn id(&self) -> &'static str {
671                "test-watch-adapter"
672            }
673
674            fn version(&self) -> &'static str {
675                "0.0.0"
676            }
677
678            fn provider(&self) -> &'static str {
679                self.provider
680            }
681
682            fn discover(&self) -> Vec<SourceLocation> {
683                Vec::new()
684            }
685
686            fn scan_candidates(&self, _source: &SourceLocation) -> Result<Vec<ScanCandidateFile>> {
687                Ok(Vec::new())
688            }
689
690            fn probe_verified_source_state(
691                &self,
692                _source: &SourceLocation,
693            ) -> Result<Option<VerifiedSourceState>> {
694                Ok(self.verified_state.clone())
695            }
696
697            fn scan(
698                &self,
699                _source: &SourceLocation,
700                _options: &ScanOptions,
701            ) -> Result<statsai_adapters::AdapterScan> {
702                *self.scan_calls.lock().expect("scan calls") += 1;
703                Ok(statsai_adapters::AdapterScan::default())
704            }
705        }
706
707        #[test]
708        fn rescan_changed_sources_reconciles_verified_auth_without_pending_usage_files() {
709            let store = Store::in_memory().expect("store");
710            let root =
711                std::env::temp_dir().join(format!("statsai-watch-auth-{}", std::process::id()));
712            std::fs::create_dir_all(&root).expect("temp source root");
713            let mut source = SourceLocation::local_adapter(
714                "codex",
715                "test",
716                "0",
717                &root,
718                LocationOrigin::Configured,
719            );
720            source.verification_mode = SourceVerificationMode::Auto;
721            store.upsert_source(&source).expect("source");
722
723            let authenticated_at = Utc
724                .with_ymd_and_hms(2026, 5, 29, 10, 12, 43)
725                .single()
726                .expect("authenticated_at");
727            let verified_at = Utc
728                .with_ymd_and_hms(2026, 5, 29, 10, 14, 56)
729                .single()
730                .expect("verified_at");
731            let current_period_ends_at = Utc
732                .with_ymd_and_hms(2026, 6, 29, 10, 12, 43)
733                .single()
734                .expect("current_period_ends_at");
735            let scan_calls = Arc::new(Mutex::new(0u64));
736            let adapters: Vec<Box<dyn ProviderAdapter>> = vec![Box::new(TestAdapter {
737                provider: "codex",
738                verified_state: Some(VerifiedSourceState {
739                    provider_user_id: Some("acct-watch".to_string()),
740                    email: Some("watch@example.com".to_string()),
741                    account_label: None,
742                    plan_name: Some("Plus".to_string()),
743                    authenticated_at: Some(authenticated_at),
744                    verified_at: Some(verified_at),
745                    subscription: Some(VerifiedSubscriptionState {
746                        plan_name: "Plus".to_string(),
747                        price: 2000,
748                        currency: "USD".to_string(),
749                        billing_period: BillingPeriod::Monthly,
750                        paid_at: Some(authenticated_at),
751                        started_at: authenticated_at,
752                        ended_at: Some(current_period_ends_at),
753                        current_period_ends_at: Some(current_period_ends_at),
754                        status: SubscriptionStatus::Active,
755                        verified_at: Some(verified_at),
756                    }),
757                }),
758                scan_calls: scan_calls.clone(),
759            })];
760
761            rescan_changed_sources_with_adapters(
762                &store,
763                "device-test",
764                &[
765                    PathBuf::from(source.path_label.as_deref().expect("path label"))
766                        .join("auth.json"),
767                ],
768                &adapters,
769            );
770
771            assert_eq!(*scan_calls.lock().expect("scan calls"), 0);
772            assert_eq!(store.list_accounts().expect("accounts").len(), 1);
773            assert_eq!(store.list_subscriptions().expect("subscriptions").len(), 1);
774            let assignments = store
775                .list_source_account_assignments_for_source(&source.source_id)
776                .expect("assignments");
777            assert_eq!(assignments.len(), 1);
778            assert_eq!(assignments[0].started_at, authenticated_at);
779            assert_eq!(assignments[0].ended_at, None);
780            assert_eq!(assignments[0].record_source, IdentitySource::LocalAuth);
781            let stored_source = store
782                .source(&source.source_id)
783                .expect("source")
784                .expect("stored source");
785            assert!(stored_source.verified_state_hash.is_some());
786
787            let _ = std::fs::remove_dir_all(&root);
788        }
789    }
790}
791
792#[cfg(not(feature = "watch"))]
793pub fn watch_and_serve(_addr: &str, _store: Arc<Mutex<Store>>, _device_id: &str) -> Result<()> {
794    anyhow::bail!(
795        "daemon --watch requires the `watch` cargo feature (enable with --features watch)"
796    )
797}
798
799#[cfg(feature = "watch")]
800pub fn watch_and_serve(addr: &str, store: Arc<Mutex<Store>>, device_id: &str) -> Result<()> {
801    watch::watch_and_serve(addr, store, device_id)
802}
803
804fn ensure_loopback(addr: &str) -> Result<()> {
805    let mut addrs = addr.to_socket_addrs()?;
806    let Some(addr) = addrs.next() else {
807        anyhow::bail!("local API address did not resolve");
808    };
809    if !addr.ip().is_loopback() {
810        anyhow::bail!("local API must bind to a loopback address");
811    }
812    Ok(())
813}
814
815#[cfg(test)]
816mod tests {
817    use super::*;
818    use chrono::Utc;
819
820    fn empty_batch() -> SyncBatch {
821        SyncBatch {
822            schema_version: SYNC_BATCH_SCHEMA_VERSION.to_string(),
823            batch_id: "batch_test".to_string(),
824            device_id: "device_test".to_string(),
825            sources: Vec::new(),
826            accounts: Vec::new(),
827            source_account_assignments: Vec::new(),
828            subscriptions: Vec::new(),
829            events: Vec::new(),
830            summaries: Vec::new(),
831            created_at: Utc::now(),
832        }
833    }
834
835    #[test]
836    fn ingest_empty_sync_batch_returns_ack() {
837        let store = Store::in_memory().expect("store");
838        let ack = ingest_sync_batch(&store, &empty_batch()).expect("ack");
839
840        assert_eq!(ack.schema_version, SYNC_ACK_SCHEMA_VERSION);
841        assert_eq!(ack.batch_id, "batch_test");
842        assert_eq!(ack.accepted.events, 0);
843        assert_eq!(ack.duplicates.events, 0);
844        assert!(ack.rejected.is_empty());
845    }
846
847    #[test]
848    fn ingest_rejects_unsupported_schema() {
849        let store = Store::in_memory().expect("store");
850        let mut batch = empty_batch();
851        batch.schema_version = "sync_batch.v0".to_string();
852
853        let error = ingest_sync_batch(&store, &batch).expect_err("unsupported schema");
854        assert!(error.to_string().contains("unsupported sync batch schema"));
855    }
856}