Skip to main content

orbok_workers/
storage.rs

1//! Storage accounting (RFC-011 §9): measures actual orbok storage
2//! consumption and updates the `storage_accounting` table.
3//!
4//! Measurements are approximate on purpose — exact byte-level
5//! accounting per row is expensive; page-level and aggregate-query
6//! measurements are fast and accurate enough for the Storage view.
7
8use orbok_core::{OrbokResult, StorageCategory};
9use orbok_db::Catalog;
10use orbok_db::repo::StorageAccountingRepository;
11use std::path::Path;
12
13/// Compute and persist storage accounting for the Storage view
14/// (RFC-011 §9 "approximate by default").
15///
16/// Called by the worker pipeline after each indexing run and by the
17/// Storage view's "refresh" action.
18pub fn update_storage_accounting(
19    catalog: &Catalog,
20    cache_db_path: &Path,
21) -> OrbokResult<Vec<(StorageCategory, u64, u64)>> {
22    let storage = StorageAccountingRepository::new(catalog);
23    let mut rows = Vec::new();
24
25    macro_rules! measure {
26        ($cat:expr, $size:expr, $count:expr) => {{
27            storage.upsert($cat, $size, $count)?;
28            rows.push(($cat, $size, $count));
29        }};
30    }
31
32    let conn = catalog.lock();
33
34    // Persistent catalog: approximate as the file size of catalog DB.
35    // If in-memory (:memory:), report 0.
36    let catalog_path = catalog.path();
37    let catalog_bytes = if catalog_path.to_str() == Some(":memory:") {
38        // Use page_count × page_size as proxy for in-memory databases.
39        let pages: i64 = conn
40            .query_row("PRAGMA page_count", [], |r| r.get(0))
41            .unwrap_or(0);
42        let page_size: i64 = conn
43            .query_row("PRAGMA page_size", [], |r| r.get(0))
44            .unwrap_or(4096);
45        (pages * page_size) as u64
46    } else {
47        std::fs::metadata(catalog_path)
48            .map(|m| m.len())
49            .unwrap_or(0)
50    };
51    // Source count for "items"
52    let source_count: i64 = conn
53        .query_row(
54            "SELECT COUNT(*) FROM sources WHERE status != 'removed'",
55            [],
56            |r| r.get(0),
57        )
58        .unwrap_or(0);
59    drop(conn); // release before re-acquiring below
60    measure!(
61        StorageCategory::PersistentCatalog,
62        catalog_bytes,
63        source_count as u64
64    );
65
66    // Keyword index: row count from keyword_index_records.
67    let conn = catalog.lock();
68    let kw_count: i64 = conn
69        .query_row(
70            "SELECT COUNT(*) FROM keyword_index_records WHERE status='active'",
71            [],
72            |r| r.get(0),
73        )
74        .unwrap_or(0);
75    // Approximate size: 256 bytes per token record (FTS overhead).
76    let kw_bytes = kw_count as u64 * 256;
77    drop(conn);
78    measure!(StorageCategory::KeywordIndex, kw_bytes, kw_count as u64);
79
80    // Vector index: actual BLOB sizes.
81    let conn = catalog.lock();
82    let (emb_count, emb_bytes): (i64, i64) = conn
83        .query_row(
84            "SELECT COUNT(*), COALESCE(SUM(LENGTH(vector_blob)), 0) FROM embeddings WHERE status='active'",
85            [],
86            |r| Ok((r.get(0)?, r.get(1)?)),
87        )
88        .unwrap_or((0, 0));
89    drop(conn);
90    measure!(
91        StorageCategory::VectorIndex,
92        emb_bytes as u64,
93        emb_count as u64
94    );
95
96    // Snippet cache: stored size_bytes column.
97    let conn = catalog.lock();
98    let (snip_count, snip_bytes): (i64, i64) = conn
99        .query_row(
100            "SELECT COUNT(*), COALESCE(SUM(size_bytes), 0) FROM snippet_cache",
101            [],
102            |r| Ok((r.get(0)?, r.get(1)?)),
103        )
104        .unwrap_or((0, 0));
105    drop(conn);
106    measure!(
107        StorageCategory::SnippetCache,
108        snip_bytes as u64,
109        snip_count as u64
110    );
111
112    // Search cache: row count (size unknown; estimate 512 bytes each).
113    let conn = catalog.lock();
114    let sr_count: i64 = conn
115        .query_row("SELECT COUNT(*) FROM search_result_cache", [], |r| r.get(0))
116        .unwrap_or(0);
117    drop(conn);
118    measure!(
119        StorageCategory::SearchCache,
120        sr_count as u64 * 512,
121        sr_count as u64
122    );
123
124    // Temporary extraction: localcache DB file size.
125    let cache_bytes = std::fs::metadata(cache_db_path)
126        .map(|m| m.len())
127        .unwrap_or(0);
128    let conn = catalog.lock();
129    let extract_count: i64 = conn
130        .query_row(
131            "SELECT COUNT(*) FROM extraction_records WHERE status='succeeded'",
132            [],
133            |r| r.get(0),
134        )
135        .unwrap_or(0);
136    drop(conn);
137    measure!(
138        StorageCategory::TemporaryExtraction,
139        cache_bytes,
140        extract_count as u64
141    );
142
143    // Logs: app_events row estimate.
144    let conn = catalog.lock();
145    let evt_count: i64 = conn
146        .query_row("SELECT COUNT(*) FROM app_events", [], |r| r.get(0))
147        .unwrap_or(0);
148    drop(conn);
149    measure!(
150        StorageCategory::Logs,
151        evt_count as u64 * 256,
152        evt_count as u64
153    );
154
155    // Model files: not tracked in v0.4 (full workflow lands in M12).
156    measure!(StorageCategory::ModelFiles, 0, 0);
157
158    Ok(rows)
159}