Skip to main content

mirror_log/
decay.rs

1use rusqlite::{params, Connection, Result};
2use std::time::UNIX_EPOCH;
3
4const DECAY_THRESHOLD_DAYS: i64 = 30;
5const ACCESS_COUNT_THRESHOLD: i64 = 10;
6
7/// Initialize the decay tracking table
8pub fn init_decay_table(conn: &Connection) -> Result<()> {
9    conn.execute(
10        "CREATE TABLE IF NOT EXISTS decay (
11            event_id TEXT PRIMARY KEY,
12            access_count INTEGER NOT NULL DEFAULT 0,
13            last_accessed INTEGER NOT NULL,
14            pinned BOOLEAN NOT NULL DEFAULT 0,
15            created_at INTEGER NOT NULL DEFAULT (unixepoch())
16        )",
17        [],
18    )?;
19    Ok(())
20}
21
22/// Initialize the shadow events table
23pub fn init_shadow_table(conn: &Connection) -> Result<()> {
24    conn.execute(
25        "CREATE TABLE IF NOT EXISTS shadow_state (
26            event_id TEXT PRIMARY KEY,
27            decay_score REAL NOT NULL,
28            flagged_at INTEGER NOT NULL DEFAULT (unixepoch()),
29            FOREIGN KEY (event_id) REFERENCES events(id) ON DELETE CASCADE
30        )",
31        [],
32    )?;
33    Ok(())
34}
35
36/// Initialize decay-related tables
37pub fn init_decay_tables(conn: &Connection) -> Result<()> {
38    init_decay_table(conn)?;
39    init_shadow_table(conn)?;
40    Ok(())
41}
42
43/// Increment access count for an event and update last_accessed timestamp
44pub fn track_access(conn: &Connection, event_id: &str) -> Result<()> {
45    conn.execute(
46        "INSERT INTO decay (event_id, last_accessed)
47         VALUES (?1, unixepoch())
48         ON CONFLICT(event_id) DO UPDATE SET
49            access_count = decay.access_count + 1,
50            last_accessed = unixepoch(),
51            pinned = decay.pinned",
52        [event_id],
53    )?;
54    Ok(())
55}
56
57/// Get decay score for an event: access_count / days_since_logged
58pub fn get_decay_score(conn: &Connection, event_id: &str) -> Result<f64> {
59    let score: Option<f64> = conn.query_row(
60        "SELECT
61            CASE
62                WHEN last_accessed = 0 THEN 0
63                WHEN CAST((unixepoch() - last_accessed) / 86400 AS INTEGER) < 1 THEN CAST(access_count AS REAL)
64                ELSE CAST(access_count AS REAL) /
65                     CAST((unixepoch() - last_accessed) / 86400 AS REAL)
66            END
67         FROM decay
68         WHERE event_id = ?1",
69        [event_id],
70        |row| row.get(0),
71    )?;
72    Ok(score.unwrap_or(0.0))
73}
74
75/// Check if an event is flagged for decay (below threshold)
76pub fn is_flagged(conn: &Connection, event_id: &str) -> Result<bool> {
77    let flagged: bool = conn.query_row(
78        "SELECT EXISTS(
79            SELECT 1 FROM decay
80            WHERE event_id = ?1
81            AND access_count < ?2
82            AND (unixepoch() - last_accessed) > ?3 * 86400
83            AND pinned = 0
84        )",
85        params![event_id, ACCESS_COUNT_THRESHOLD, DECAY_THRESHOLD_DAYS],
86        |row| row.get(0),
87    )?;
88    Ok(flagged)
89}
90
91/// Get all events that meet decay criteria
92pub fn get_flagged_events(conn: &Connection) -> Result<Vec<String>> {
93    let mut ids = Vec::new();
94
95    let mut stmt = conn.prepare(
96        "SELECT e.id FROM events e
97         JOIN decay d ON e.id = d.event_id
98         LEFT JOIN shadow_state s ON e.id = s.event_id
99         WHERE d.access_count < ?
100         AND (unixepoch() - d.last_accessed) > ? * 86400
101         AND d.pinned = 0
102         AND s.event_id IS NULL
103         ORDER BY d.last_accessed ASC",
104    )?;
105
106    let rows = stmt.query_map(
107        params![ACCESS_COUNT_THRESHOLD, DECAY_THRESHOLD_DAYS],
108        |row| row.get(0),
109    )?;
110
111    for row in rows {
112        ids.push(row?);
113    }
114
115    Ok(ids)
116}
117
118/// Move flagged events to the shadow state so they no longer appear in normal queries.
119pub fn move_to_shadow(conn: &Connection) -> Result<usize> {
120    let flagged_ids = get_flagged_events(conn)?;
121
122    if flagged_ids.is_empty() {
123        return Ok(0);
124    }
125
126    // Get decay scores for flagged events
127    let mut scores: Vec<(String, f64)> = Vec::new();
128    for id in &flagged_ids {
129        if let Ok(score) = get_decay_score(conn, id) {
130            scores.push((id.clone(), score));
131        }
132    }
133
134    let tx = conn.unchecked_transaction()?;
135    let mut moved = 0;
136    for (id, score) in scores {
137        let now = UNIX_EPOCH.elapsed().unwrap().as_secs() as i64;
138
139        tx.execute(
140            "INSERT INTO shadow_state (event_id, decay_score, flagged_at)
141             VALUES (?1, ?2, ?3)
142             ON CONFLICT(event_id) DO UPDATE SET
143                decay_score = excluded.decay_score,
144                flagged_at = excluded.flagged_at",
145            params![id, score, now],
146        )?;
147
148        moved += 1;
149    }
150
151    tx.commit()?;
152    Ok(moved)
153}
154
155/// Get all shadowed events
156pub fn get_shadow_events(conn: &Connection) -> Result<Vec<ShadowEvent>> {
157    let mut events = Vec::new();
158
159    let mut stmt = conn.prepare(
160        "SELECT e.id, e.timestamp, e.source, e.content, e.meta, e.ingested_at, e.content_hash, s.decay_score, s.flagged_at
161         FROM shadow_state s
162         JOIN events e ON e.id = s.event_id
163         ORDER BY flagged_at DESC"
164    )?;
165
166    let rows = stmt.query_map([], |row| {
167        Ok(ShadowEvent {
168            id: row.get(0)?,
169            timestamp: row.get(1)?,
170            source: row.get(2)?,
171            content: row.get(3)?,
172            meta: row.get(4)?,
173            ingested_at: row.get(5)?,
174            content_hash: row.get(6)?,
175            decay_score: row.get(7)?,
176            flagged_at: row.get(8)?,
177        })
178    })?;
179
180    for row in rows {
181        events.push(row?);
182    }
183
184    Ok(events)
185}
186
187/// Pin an event (immune to decay)
188pub fn pin_event(conn: &Connection, event_id: &str) -> Result<()> {
189    conn.execute(
190        "UPDATE decay SET pinned = 1 WHERE event_id = ?1",
191        [event_id],
192    )?;
193    Ok(())
194}
195
196/// Unpin an event
197pub fn unpin_event(conn: &Connection, event_id: &str) -> Result<()> {
198    conn.execute(
199        "UPDATE decay SET pinned = 0 WHERE event_id = ?1",
200        [event_id],
201    )?;
202    Ok(())
203}
204
205/// Restore an event from shadow back to main table
206pub fn restore_from_shadow(conn: &Connection, event_id: &str) -> Result<()> {
207    conn.execute("DELETE FROM shadow_state WHERE event_id = ?1", [event_id])?;
208
209    // Reset decay tracking
210    conn.execute(
211        "INSERT INTO decay (event_id, access_count, last_accessed, pinned)
212         VALUES (?1, 0, unixepoch(), 0)
213         ON CONFLICT(event_id) DO UPDATE SET
214            access_count = 0,
215            last_accessed = unixepoch(),
216            pinned = 0",
217        [event_id],
218    )?;
219
220    Ok(())
221}
222
223/// Get decay statistics
224pub fn get_decay_stats(conn: &Connection) -> Result<DecayStats> {
225    let total_events: i64 = conn.query_row("SELECT COUNT(*) FROM decay", [], |row| row.get(0))?;
226
227    let total_access: i64 =
228        conn.query_row("SELECT SUM(access_count) FROM decay", [], |row| row.get(0))?;
229
230    let pinned_count: i64 =
231        conn.query_row("SELECT COUNT(*) FROM decay WHERE pinned = 1", [], |row| {
232            row.get(0)
233        })?;
234
235    let avg_access: f64 = if total_events > 0 {
236        total_access as f64 / total_events as f64
237    } else {
238        0.0
239    };
240
241    let flagged_count: i64 = conn.query_row(
242        "SELECT COUNT(*)
243         FROM decay d
244         LEFT JOIN shadow_state s ON d.event_id = s.event_id
245         WHERE d.access_count < ?1
246         AND (unixepoch() - d.last_accessed) > ?2 * 86400
247         AND d.pinned = 0
248         AND s.event_id IS NULL",
249        params![ACCESS_COUNT_THRESHOLD, DECAY_THRESHOLD_DAYS],
250        |row| row.get(0),
251    )?;
252
253    Ok(DecayStats {
254        total_events,
255        total_access,
256        pinned_count,
257        avg_access,
258        flagged_count,
259    })
260}
261
262#[derive(Debug, Clone)]
263pub struct ShadowEvent {
264    pub id: String,
265    pub timestamp: i64,
266    pub source: String,
267    pub content: String,
268    pub meta: Option<String>,
269    pub ingested_at: i64,
270    pub content_hash: Option<String>,
271    pub decay_score: f64,
272    pub flagged_at: i64,
273}
274
275#[derive(Debug, Clone)]
276pub struct DecayStats {
277    pub total_events: i64,
278    pub total_access: i64,
279    pub pinned_count: i64,
280    pub avg_access: f64,
281    pub flagged_count: i64,
282}