1use anyhow::{bail, Context, Result};
4use serde_json::json;
5use statsai_core::{
6 SyncAck, SyncBatch, SyncEntityCounts, SyncRejectedRecord, SYNC_ACK_SCHEMA_VERSION,
7 SYNC_BATCH_SCHEMA_VERSION,
8};
9use statsai_store::Store;
10use std::net::ToSocketAddrs;
11use std::sync::{Arc, Mutex, MutexGuard};
12use tiny_http::{Header, Method, Request, Response, Server, StatusCode};
13
14fn lock_store(store: &Arc<Mutex<Store>>) -> MutexGuard<'_, Store> {
15 store.lock().unwrap_or_else(|e| e.into_inner())
16}
17
18pub fn run(addr: &str, store: Arc<Mutex<Store>>) -> Result<()> {
19 ensure_loopback(addr)?;
20 let server =
21 Server::http(addr).map_err(|err| anyhow::anyhow!("start local API on {addr}: {err}"))?;
22
23 for request in server.incoming_requests() {
24 handle_request(request, &store)?;
25 }
26
27 Ok(())
28}
29
30fn handle_request(mut request: Request, store: &Arc<Mutex<Store>>) -> Result<()> {
31 let method = request.method().clone();
32 let url = request.url().to_string();
33
34 if method == Method::Post && url == "/v1/sync/batches" {
35 let mut body = String::new();
36 request
37 .as_reader()
38 .read_to_string(&mut body)
39 .context("read sync batch request")?;
40 let batch: SyncBatch = match serde_json::from_str(&body) {
41 Ok(batch) => batch,
42 Err(error) => {
43 return respond_text(request, StatusCode(400), &format!("invalid batch: {error}"));
44 }
45 };
46 let ack = {
47 let s = lock_store(store);
48 match ingest_sync_batch(&s, &batch) {
49 Ok(ack) => ack,
50 Err(error) => {
51 return respond_text(request, StatusCode(400), &error.to_string());
52 }
53 }
54 };
55 return respond_json(request, StatusCode(200), &ack);
56 }
57
58 if method != Method::Get {
59 return respond_text(request, StatusCode(405), "method not allowed");
60 }
61
62 let s = lock_store(store);
63 let payload = match url.as_str() {
64 "/health" => json!({"status": "ok"}),
65 "/status" => json!({
66 "events": s.event_count()?,
67 "tokens": s.token_total()?
68 }),
69 "/sources" => serde_json::to_value(s.list_sources()?)?,
70 "/accounts" => serde_json::to_value(s.list_accounts()?)?,
71 "/source-account-assignments" => {
72 serde_json::to_value(s.list_source_account_assignments()?)?
73 }
74 "/subscriptions" => serde_json::to_value(s.list_subscriptions()?)?,
75 "/reports/weekly" => json!({
76 "events": s.event_count()?,
77 "tokens": s.token_total()?
78 }),
79 _ => {
80 drop(s);
81 return respond_text(request, StatusCode(404), "not found");
82 }
83 };
84 drop(s);
85
86 respond_json(request, StatusCode(200), &payload)
87}
88
89pub fn ingest_sync_batch(store: &Store, batch: &SyncBatch) -> Result<SyncAck> {
90 if batch.schema_version != SYNC_BATCH_SCHEMA_VERSION {
91 bail!("unsupported sync batch schema {}", batch.schema_version);
92 }
93
94 for source in &batch.sources {
95 store.upsert_source(source)?;
96 }
97 for account in &batch.accounts {
98 store.upsert_account(account)?;
99 }
100 for assignment in &batch.source_account_assignments {
101 store.upsert_source_account_assignment(assignment)?;
102 }
103 for subscription in &batch.subscriptions {
104 store.upsert_subscription(subscription)?;
105 }
106 let inserted_events = store.insert_events(&batch.events)?;
107 let written_summaries = store.upsert_summaries(&batch.summaries)?;
108
109 Ok(SyncAck {
110 schema_version: SYNC_ACK_SCHEMA_VERSION.to_string(),
111 batch_id: batch.batch_id.clone(),
112 accepted: SyncEntityCounts {
113 sources: batch.sources.len() as u64,
114 accounts: batch.accounts.len() as u64,
115 source_account_assignments: batch.source_account_assignments.len() as u64,
116 subscriptions: batch.subscriptions.len() as u64,
117 events: inserted_events,
118 summaries: written_summaries,
119 },
120 duplicates: SyncEntityCounts {
121 sources: 0,
122 accounts: 0,
123 source_account_assignments: 0,
124 subscriptions: 0,
125 events: (batch.events.len() as u64).saturating_sub(inserted_events),
126 summaries: 0,
127 },
128 rejected: Vec::<SyncRejectedRecord>::new(),
129 })
130}
131
132fn respond_json<T: serde::Serialize>(
133 request: Request,
134 status: StatusCode,
135 payload: &T,
136) -> Result<()> {
137 let body = serde_json::to_string_pretty(payload)?;
138 let response = Response::from_string(body)
139 .with_status_code(status)
140 .with_header(content_type_json());
141 request.respond(response)?;
142 Ok(())
143}
144
145fn respond_text(request: Request, status: StatusCode, body: &str) -> Result<()> {
146 let response = Response::from_string(body).with_status_code(status);
147 request.respond(response)?;
148 Ok(())
149}
150
151fn content_type_json() -> Header {
152 Header::from_bytes("content-type", "application/json").expect("static header is valid")
153}
154
155#[cfg(feature = "watch")]
156mod watch {
157 use anyhow::{Context, Result};
158 use chrono::{DateTime, Utc};
159 use notify::{Event, EventKind, RecursiveMode, Watcher};
160 use statsai_adapters::{default_adapters, ProviderAdapter, ScanCandidateFile, ScanOptions};
161 use statsai_core::{
162 timestamp_in_period, IdentitySource, ProviderAccountId, SourceAccountAssignment,
163 SourceKind, SourceLocation, SourceVerificationMode, UsageEvent, UsageSummary,
164 };
165 use statsai_store::{
166 effective_verified_source_state_is_missing, has_active_verified_source_assignment,
167 reconcile_verified_source_state, verified_source_state_hash, ScanFileStateEntry, Store,
168 };
169 use std::collections::HashSet;
170 use std::path::PathBuf;
171 use std::sync::mpsc;
172 use std::sync::{Arc, Mutex};
173 use std::time::Duration;
174 use tiny_http::Server;
175
176 pub fn watch_and_serve(addr: &str, store: Arc<Mutex<Store>>, device_id: &str) -> Result<()> {
177 super::ensure_loopback(addr)?;
178
179 let sources = {
180 let s = super::lock_store(&store);
181 discover_watch_sources(&s)
182 };
183 let (tx, rx) = mpsc::channel();
184
185 let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
186 if let Ok(event) = res {
187 if matches!(event.kind, EventKind::Create(_) | EventKind::Modify(_)) {
188 let changed: Vec<PathBuf> = event.paths;
189 let _ = tx.send(changed);
190 }
191 }
192 })
193 .context("create file watcher")?;
194
195 for path in &sources {
196 if let Err(e) = watcher.watch(path, RecursiveMode::Recursive) {
197 eprintln!("daemon: cannot watch {}: {e}", path.display());
198 } else {
199 eprintln!("daemon: watching {}", path.display());
200 }
201 }
202
203 eprintln!("daemon: API listening on http://{addr}");
204 let server = Server::http(addr)
205 .map_err(|err| anyhow::anyhow!("start local API on {addr}: {err}"))?;
206
207 loop {
208 match rx.recv_timeout(Duration::from_millis(250)) {
209 Ok(changed) => {
210 let s = super::lock_store(&store);
211 rescan_changed_sources(&s, device_id, &changed);
212 }
213 Err(mpsc::RecvTimeoutError::Timeout) => {}
214 Err(mpsc::RecvTimeoutError::Disconnected) => break,
215 }
216
217 if let Ok(Some(request)) = server.try_recv() {
218 super::handle_request(request, &store)?;
219 }
220 }
221
222 Ok(())
223 }
224
225 fn discover_watch_sources(store: &Store) -> Vec<PathBuf> {
226 let mut paths = Vec::new();
227
228 if let Ok(configured) = store.list_sources() {
229 for source in configured {
230 if source.source_kind != SourceKind::LocalAdapter {
231 continue;
232 }
233 if let Some(label) = source.path_label.as_deref().filter(|p| !p.is_empty()) {
234 let path = PathBuf::from(label);
235 if path.is_dir() && !paths.contains(&path) {
236 paths.push(path);
237 }
238 }
239 }
240 }
241
242 for adapter in default_adapters() {
243 for source in adapter.discover() {
244 if source.source_kind != SourceKind::LocalAdapter {
245 continue;
246 }
247 if let Some(label) = source.path_label.as_deref().filter(|p| !p.is_empty()) {
248 let path = PathBuf::from(label);
249 if path.is_dir() && !paths.contains(&path) {
250 paths.push(path);
251 }
252 }
253 }
254 }
255
256 paths
257 }
258
259 fn rescan_changed_sources(store: &Store, device_id: &str, changed: &[PathBuf]) {
260 let adapters: Vec<Box<dyn ProviderAdapter>> = default_adapters();
261 rescan_changed_sources_with_adapters(store, device_id, changed, &adapters);
262 }
263
264 fn rescan_changed_sources_with_adapters(
265 store: &Store,
266 device_id: &str,
267 changed: &[PathBuf],
268 adapters: &[Box<dyn ProviderAdapter>],
269 ) {
270 let configured = match store.list_sources() {
271 Ok(sources) => sources,
272 Err(e) => {
273 eprintln!("daemon: failed to list sources: {e}");
274 return;
275 }
276 };
277
278 for adapter in adapters {
279 let sources = scan_sources_for_paths(adapter.as_ref(), &configured, changed);
280 for mut source in sources {
281 let cache_candidates = match adapter.scan_candidates(&source) {
282 Ok(candidates) => candidates,
283 Err(e) => {
284 eprintln!(
285 "daemon: scan candidate discovery failed for {}: {e}",
286 source.path_label.as_deref().unwrap_or("unknown")
287 );
288 continue;
289 }
290 };
291 let file_cache_entries = scan_file_state_entries(&cache_candidates);
292 let pending_file_entries =
293 match store.pending_scan_file_entries(&source.source_id, &file_cache_entries) {
294 Ok(entries) => entries,
295 Err(e) => {
296 eprintln!(
297 "daemon: scan cache lookup failed for {}: {e}",
298 source.path_label.as_deref().unwrap_or("unknown")
299 );
300 continue;
301 }
302 };
303 let verification_mode = source.verification_mode.clone();
304 let probed_verified_source_state =
305 if matches!(verification_mode, SourceVerificationMode::Disabled) {
306 None
307 } else {
308 match adapter.probe_verified_source_state(&source) {
309 Ok(state) => state,
310 Err(e) => {
311 eprintln!(
312 "daemon: verified auth probe failed for {}: {e}",
313 source.path_label.as_deref().unwrap_or("unknown")
314 );
315 continue;
316 }
317 }
318 };
319 let next_verified_state_hash =
320 if matches!(verification_mode, SourceVerificationMode::Auto) {
321 match verified_source_state_hash(probed_verified_source_state.as_ref()) {
322 Ok(hash) => hash,
323 Err(e) => {
324 eprintln!(
325 "daemon: verified auth hash failed for {}: {e}",
326 source.path_label.as_deref().unwrap_or("unknown")
327 );
328 continue;
329 }
330 }
331 } else {
332 None
333 };
334 let verified_state_changed =
335 matches!(verification_mode, SourceVerificationMode::Auto)
336 && source.verified_state_hash != next_verified_state_hash;
337 let legacy_verified_state_needs_reconciliation =
338 matches!(verification_mode, SourceVerificationMode::Auto)
339 && source.verified_state_hash.is_none()
340 && next_verified_state_hash.is_none()
341 && effective_verified_source_state_is_missing(
342 &probed_verified_source_state,
343 )
344 && match has_active_verified_source_assignment(store, &source.source_id) {
345 Ok(active) => active,
346 Err(e) => {
347 eprintln!(
348 "daemon: verified assignment lookup failed for {}: {e}",
349 source.path_label.as_deref().unwrap_or("unknown")
350 );
351 continue;
352 }
353 };
354 if pending_file_entries.is_empty()
355 && !verified_state_changed
356 && !legacy_verified_state_needs_reconciliation
357 {
358 continue;
359 }
360 let options = ScanOptions {
361 device_id: device_id.to_string(),
362 selected_cache_keys: Some(
363 pending_file_entries
364 .iter()
365 .map(|entry| entry.cache_key.clone())
366 .collect::<HashSet<_>>(),
367 ),
368 };
369 let scan_result = if pending_file_entries.is_empty() {
370 Ok(statsai_adapters::AdapterScan::default())
371 } else {
372 adapter.scan(&source, &options)
373 };
374 match scan_result {
375 Ok(mut scan) => {
376 let parsed_events = scan.events.len();
377 let parsed_summaries = scan.summaries.len();
378 let effective_verified_source_state =
379 if matches!(verification_mode, SourceVerificationMode::Disabled) {
380 None
381 } else if pending_file_entries.is_empty() {
382 probed_verified_source_state
383 } else {
384 scan.verified_source_state
385 .take()
386 .or(probed_verified_source_state)
387 };
388 if let Err(e) = reconcile_verified_source_state(
389 store,
390 &mut source,
391 effective_verified_source_state.as_ref(),
392 next_verified_state_hash,
393 ) {
394 eprintln!("daemon: verified auth reconciliation failed: {e}");
395 continue;
396 }
397 if let Err(e) = store.upsert_source(&source) {
398 eprintln!("daemon: update source verified auth state failed: {e}");
399 continue;
400 }
401 if pending_file_entries.is_empty() {
402 eprintln!(
403 "daemon: reconciled auth state for {} ({})",
404 source.provider,
405 source.path_label.as_deref().unwrap_or("unknown")
406 );
407 continue;
408 }
409 if let Err(e) = apply_source_account_resolution(
410 store,
411 &source,
412 &mut scan.events,
413 &mut scan.summaries,
414 ) {
415 eprintln!("daemon: account resolution failed: {e}");
416 continue;
417 }
418 let inserted_events = match store.insert_events(&scan.events) {
419 Ok(count) => count,
420 Err(e) => {
421 eprintln!("daemon: insert events failed: {e}");
422 continue;
423 }
424 };
425 let written_summaries = match store.upsert_summaries(&scan.summaries) {
426 Ok(count) => count,
427 Err(e) => {
428 eprintln!("daemon: insert summaries failed: {e}");
429 continue;
430 }
431 };
432 if let Err(e) =
433 store.record_scan_file_entries(&source.source_id, &pending_file_entries)
434 {
435 eprintln!("daemon: update scan cache failed: {e}");
436 continue;
437 }
438 eprintln!(
439 "daemon: rescanned {} ({}) — files={}, cached={}, parsed_events={}, inserted_events={}, parsed_summaries={}, summaries_written={}",
440 source.provider,
441 source.path_label.as_deref().unwrap_or("unknown"),
442 scan.diagnostics.files_scanned,
443 scan.diagnostics.files_skipped_unchanged,
444 parsed_events,
445 inserted_events,
446 parsed_summaries,
447 written_summaries
448 );
449 }
450 Err(e) => {
451 eprintln!(
452 "daemon: scan failed for {}: {e}",
453 source.path_label.as_deref().unwrap_or("unknown")
454 );
455 }
456 }
457 }
458 }
459 }
460
461 fn scan_sources_for_paths(
462 adapter: &dyn ProviderAdapter,
463 configured: &[SourceLocation],
464 changed: &[PathBuf],
465 ) -> Vec<SourceLocation> {
466 let mut sources = Vec::new();
467 for source in configured
468 .iter()
469 .filter(|s| {
470 s.enabled
471 && s.source_kind == SourceKind::LocalAdapter
472 && s.provider == adapter.provider()
473 })
474 .cloned()
475 {
476 if source.path_label.is_some() && source_in_changed_paths(&source, changed) {
477 sources.push(source);
478 }
479 }
480 for source in adapter.discover() {
481 if source.source_kind != SourceKind::LocalAdapter || source.path_label.is_none() {
482 continue;
483 }
484 if source_in_changed_paths(&source, changed)
485 && !sources.iter().any(|s| s.source_id == source.source_id)
486 {
487 sources.push(source);
488 }
489 }
490 sources
491 }
492
493 fn source_in_changed_paths(source: &SourceLocation, changed: &[PathBuf]) -> bool {
494 let Some(label) = source.path_label.as_deref() else {
495 return false;
496 };
497 let source_path = PathBuf::from(label);
498 changed.iter().any(|changed_path| {
499 changed_path.starts_with(&source_path) || source_path.starts_with(changed_path)
500 })
501 }
502
503 fn scan_file_state_entries(candidates: &[ScanCandidateFile]) -> Vec<ScanFileStateEntry> {
504 candidates
505 .iter()
506 .map(|candidate| ScanFileStateEntry {
507 cache_key: candidate.cache_key.clone(),
508 cache_signature: candidate.cache_signature.clone(),
509 })
510 .collect()
511 }
512
513 fn apply_source_account_resolution(
514 store: &Store,
515 source: &SourceLocation,
516 events: &mut [UsageEvent],
517 summaries: &mut [UsageSummary],
518 ) -> Result<()> {
519 let assignments = store.list_source_account_assignments_for_source(&source.source_id)?;
520 for event in events {
521 apply_account_resolution_to_event(&assignments, event);
522 }
523 for summary in summaries {
524 apply_account_resolution_to_summary(&assignments, summary);
525 }
526 Ok(())
527 }
528
529 fn apply_account_resolution_to_event(
530 assignments: &[SourceAccountAssignment],
531 event: &mut UsageEvent,
532 ) {
533 if keep_detected_account_identity(
534 event.provider_account_id.as_ref(),
535 event
536 .parse_evidence
537 .as_ref()
538 .map(|evidence| &evidence.account_identity_source),
539 ) {
540 return;
541 }
542 let assignment = assignment_for_timestamp(assignments, event.session.started_at);
543 if let Some(assignment) = assignment {
544 event.provider_account_id = Some(assignment.provider_account_id.clone());
545 if let Some(evidence) = event.parse_evidence.as_mut() {
546 evidence.account_identity_source = IdentitySource::SourceConfig;
547 }
548 } else if should_clear_resolved_account(
549 event.provider_account_id.as_ref(),
550 event
551 .parse_evidence
552 .as_ref()
553 .map(|evidence| &evidence.account_identity_source),
554 ) {
555 event.provider_account_id = None;
556 if let Some(evidence) = event.parse_evidence.as_mut() {
557 evidence.account_identity_source = IdentitySource::Unresolved;
558 }
559 }
560 }
561
562 fn apply_account_resolution_to_summary(
563 assignments: &[SourceAccountAssignment],
564 summary: &mut UsageSummary,
565 ) {
566 if keep_detected_account_identity(
567 summary.provider_account_id.as_ref(),
568 summary
569 .parse_evidence
570 .as_ref()
571 .map(|evidence| &evidence.account_identity_source),
572 ) {
573 return;
574 }
575 let timestamp = summary.period_start.unwrap_or(summary.observed_at);
576 let assignment = assignment_for_timestamp(assignments, timestamp);
577 if let Some(assignment) = assignment {
578 summary.provider_account_id = Some(assignment.provider_account_id.clone());
579 if let Some(evidence) = summary.parse_evidence.as_mut() {
580 evidence.account_identity_source = IdentitySource::SourceConfig;
581 }
582 } else if should_clear_resolved_account(
583 summary.provider_account_id.as_ref(),
584 summary
585 .parse_evidence
586 .as_ref()
587 .map(|evidence| &evidence.account_identity_source),
588 ) {
589 summary.provider_account_id = None;
590 if let Some(evidence) = summary.parse_evidence.as_mut() {
591 evidence.account_identity_source = IdentitySource::Unresolved;
592 }
593 }
594 }
595
596 fn keep_detected_account_identity(
597 provider_account_id: Option<&ProviderAccountId>,
598 identity_source: Option<&IdentitySource>,
599 ) -> bool {
600 let Some(provider_account_id) = provider_account_id else {
601 return false;
602 };
603 if provider_account_id.0.trim().is_empty() {
604 return false;
605 }
606 let Some(identity_source) = identity_source else {
607 return false;
608 };
609 !matches!(
610 identity_source,
611 IdentitySource::SourceConfig
612 | IdentitySource::UserConfigured
613 | IdentitySource::ManualHint
614 | IdentitySource::Unknown
615 | IdentitySource::Unresolved
616 )
617 }
618
619 fn should_clear_resolved_account(
620 provider_account_id: Option<&ProviderAccountId>,
621 identity_source: Option<&IdentitySource>,
622 ) -> bool {
623 let Some(provider_account_id) = provider_account_id else {
624 return false;
625 };
626 if provider_account_id.0.trim().is_empty() {
627 return false;
628 }
629 matches!(
630 identity_source,
631 None | Some(
632 IdentitySource::SourceConfig
633 | IdentitySource::UserConfigured
634 | IdentitySource::ManualHint
635 | IdentitySource::Unknown
636 | IdentitySource::Unresolved
637 )
638 )
639 }
640
641 fn assignment_for_timestamp(
642 assignments: &[SourceAccountAssignment],
643 timestamp: DateTime<Utc>,
644 ) -> Option<&SourceAccountAssignment> {
645 assignments
646 .iter()
647 .filter(|assignment| {
648 timestamp_in_period(timestamp, assignment.started_at, assignment.ended_at)
649 })
650 .max_by(|left, right| left.started_at.cmp(&right.started_at))
651 }
652
653 #[cfg(test)]
654 mod tests {
655 use super::*;
656 use chrono::TimeZone;
657 use statsai_core::{
658 BillingPeriod, LocationOrigin, SubscriptionStatus, VerifiedSourceState,
659 VerifiedSubscriptionState,
660 };
661 use std::sync::{Arc, Mutex};
662
663 struct TestAdapter {
664 provider: &'static str,
665 verified_state: Option<VerifiedSourceState>,
666 scan_calls: Arc<Mutex<u64>>,
667 }
668
669 impl ProviderAdapter for TestAdapter {
670 fn id(&self) -> &'static str {
671 "test-watch-adapter"
672 }
673
674 fn version(&self) -> &'static str {
675 "0.0.0"
676 }
677
678 fn provider(&self) -> &'static str {
679 self.provider
680 }
681
682 fn discover(&self) -> Vec<SourceLocation> {
683 Vec::new()
684 }
685
686 fn scan_candidates(&self, _source: &SourceLocation) -> Result<Vec<ScanCandidateFile>> {
687 Ok(Vec::new())
688 }
689
690 fn probe_verified_source_state(
691 &self,
692 _source: &SourceLocation,
693 ) -> Result<Option<VerifiedSourceState>> {
694 Ok(self.verified_state.clone())
695 }
696
697 fn scan(
698 &self,
699 _source: &SourceLocation,
700 _options: &ScanOptions,
701 ) -> Result<statsai_adapters::AdapterScan> {
702 *self.scan_calls.lock().expect("scan calls") += 1;
703 Ok(statsai_adapters::AdapterScan::default())
704 }
705 }
706
707 #[test]
708 fn rescan_changed_sources_reconciles_verified_auth_without_pending_usage_files() {
709 let store = Store::in_memory().expect("store");
710 let root =
711 std::env::temp_dir().join(format!("statsai-watch-auth-{}", std::process::id()));
712 std::fs::create_dir_all(&root).expect("temp source root");
713 let mut source = SourceLocation::local_adapter(
714 "codex",
715 "test",
716 "0",
717 &root,
718 LocationOrigin::Configured,
719 );
720 source.verification_mode = SourceVerificationMode::Auto;
721 store.upsert_source(&source).expect("source");
722
723 let authenticated_at = Utc
724 .with_ymd_and_hms(2026, 5, 29, 10, 12, 43)
725 .single()
726 .expect("authenticated_at");
727 let verified_at = Utc
728 .with_ymd_and_hms(2026, 5, 29, 10, 14, 56)
729 .single()
730 .expect("verified_at");
731 let current_period_ends_at = Utc
732 .with_ymd_and_hms(2026, 6, 29, 10, 12, 43)
733 .single()
734 .expect("current_period_ends_at");
735 let scan_calls = Arc::new(Mutex::new(0u64));
736 let adapters: Vec<Box<dyn ProviderAdapter>> = vec![Box::new(TestAdapter {
737 provider: "codex",
738 verified_state: Some(VerifiedSourceState {
739 provider_user_id: Some("acct-watch".to_string()),
740 email: Some("watch@example.com".to_string()),
741 account_label: None,
742 plan_name: Some("Plus".to_string()),
743 authenticated_at: Some(authenticated_at),
744 verified_at: Some(verified_at),
745 subscription: Some(VerifiedSubscriptionState {
746 plan_name: "Plus".to_string(),
747 price: 2000,
748 currency: "USD".to_string(),
749 billing_period: BillingPeriod::Monthly,
750 paid_at: Some(authenticated_at),
751 started_at: authenticated_at,
752 ended_at: Some(current_period_ends_at),
753 current_period_ends_at: Some(current_period_ends_at),
754 status: SubscriptionStatus::Active,
755 verified_at: Some(verified_at),
756 }),
757 }),
758 scan_calls: scan_calls.clone(),
759 })];
760
761 rescan_changed_sources_with_adapters(
762 &store,
763 "device-test",
764 &[
765 PathBuf::from(source.path_label.as_deref().expect("path label"))
766 .join("auth.json"),
767 ],
768 &adapters,
769 );
770
771 assert_eq!(*scan_calls.lock().expect("scan calls"), 0);
772 assert_eq!(store.list_accounts().expect("accounts").len(), 1);
773 assert_eq!(store.list_subscriptions().expect("subscriptions").len(), 1);
774 let assignments = store
775 .list_source_account_assignments_for_source(&source.source_id)
776 .expect("assignments");
777 assert_eq!(assignments.len(), 1);
778 assert_eq!(assignments[0].started_at, authenticated_at);
779 assert_eq!(assignments[0].ended_at, None);
780 assert_eq!(assignments[0].record_source, IdentitySource::LocalAuth);
781 let stored_source = store
782 .source(&source.source_id)
783 .expect("source")
784 .expect("stored source");
785 assert!(stored_source.verified_state_hash.is_some());
786
787 let _ = std::fs::remove_dir_all(&root);
788 }
789 }
790}
791
792#[cfg(not(feature = "watch"))]
793pub fn watch_and_serve(_addr: &str, _store: Arc<Mutex<Store>>, _device_id: &str) -> Result<()> {
794 anyhow::bail!(
795 "daemon --watch requires the `watch` cargo feature (enable with --features watch)"
796 )
797}
798
799#[cfg(feature = "watch")]
800pub fn watch_and_serve(addr: &str, store: Arc<Mutex<Store>>, device_id: &str) -> Result<()> {
801 watch::watch_and_serve(addr, store, device_id)
802}
803
804fn ensure_loopback(addr: &str) -> Result<()> {
805 let mut addrs = addr.to_socket_addrs()?;
806 let Some(addr) = addrs.next() else {
807 anyhow::bail!("local API address did not resolve");
808 };
809 if !addr.ip().is_loopback() {
810 anyhow::bail!("local API must bind to a loopback address");
811 }
812 Ok(())
813}
814
815#[cfg(test)]
816mod tests {
817 use super::*;
818 use chrono::Utc;
819
820 fn empty_batch() -> SyncBatch {
821 SyncBatch {
822 schema_version: SYNC_BATCH_SCHEMA_VERSION.to_string(),
823 batch_id: "batch_test".to_string(),
824 device_id: "device_test".to_string(),
825 sources: Vec::new(),
826 accounts: Vec::new(),
827 source_account_assignments: Vec::new(),
828 subscriptions: Vec::new(),
829 events: Vec::new(),
830 summaries: Vec::new(),
831 created_at: Utc::now(),
832 }
833 }
834
835 #[test]
836 fn ingest_empty_sync_batch_returns_ack() {
837 let store = Store::in_memory().expect("store");
838 let ack = ingest_sync_batch(&store, &empty_batch()).expect("ack");
839
840 assert_eq!(ack.schema_version, SYNC_ACK_SCHEMA_VERSION);
841 assert_eq!(ack.batch_id, "batch_test");
842 assert_eq!(ack.accepted.events, 0);
843 assert_eq!(ack.duplicates.events, 0);
844 assert!(ack.rejected.is_empty());
845 }
846
847 #[test]
848 fn ingest_rejects_unsupported_schema() {
849 let store = Store::in_memory().expect("store");
850 let mut batch = empty_batch();
851 batch.schema_version = "sync_batch.v0".to_string();
852
853 let error = ingest_sync_batch(&store, &batch).expect_err("unsupported schema");
854 assert!(error.to_string().contains("unsupported sync batch schema"));
855 }
856}