Skip to main content

kernex_memory/store/
observations.rs

1//! Typed observation log (Phase D-agent Step 2.11 backbone).
2//!
3//! Read / write paths over the `observations` table introduced by
4//! migration 018. Mirrors the soft-delete discipline established in
5//! migration 017 for facts: the trait surface in
6//! [`crate::memory_store::MemoryStore`] exposes only the safe paths
7//! (`save`, `get`, `search`, `soft_delete`, `list_soft_deleted`);
8//! hard-delete is intentionally absent and reserved for emergency
9//! cleanup via the inherent surface.
10
11use std::time::SystemTime;
12
13use super::Store;
14use crate::error::MemoryError;
15use crate::observation::{Observation, ObservationType, SaveEntry};
16use crate::types::{format_sqlite_timestamp, parse_sqlite_timestamp};
17use uuid::Uuid;
18
19/// Tuple shape pulled by all SELECTs. Order matches the column order
20/// declared in migration `018_observations.sql`.
21type ObservationTuple = (
22    String,         // id
23    String,         // sender_id
24    String,         // type (as DB string)
25    String,         // title
26    Option<String>, // what
27    Option<String>, // why
28    Option<String>, // where_field
29    Option<String>, // learned
30    String,         // created_at (SQLite timestamp string)
31    String,         // updated_at
32);
33
34fn tuple_to_observation(row: ObservationTuple) -> Result<Observation, MemoryError> {
35    let (id, sender_id, kind_str, title, what, why, where_field, learned, created_at, updated_at) =
36        row;
37    let kind = ObservationType::from_db_str(&kind_str).ok_or_else(|| {
38        MemoryError::logic(format!(
39            "observation row {id} carries unknown type `{kind_str}` not in the current ObservationType enum"
40        ))
41    })?;
42    Ok(Observation {
43        id,
44        sender_id,
45        kind,
46        title,
47        what,
48        why,
49        where_field,
50        learned,
51        created_at: parse_sqlite_timestamp(&created_at)?,
52        updated_at: parse_sqlite_timestamp(&updated_at)?,
53    })
54}
55
56impl Store {
57    /// Persist a typed observation. Generates a fresh UUIDv4 id and
58    /// sets `created_at == updated_at == now`. Returns the saved row.
59    ///
60    /// The DB enforces two CHECK constraints that surface as
61    /// `MemoryError::Sqlite { source: sqlx::Error::Database(..), .. }`:
62    /// - `length(title) > 0`
63    /// - `type IN (<seven enum strings>)` (only reachable if the caller
64    ///   bypasses `ObservationType` and writes a raw string)
65    pub async fn save_observation(&self, entry: SaveEntry) -> Result<Observation, MemoryError> {
66        let id = Uuid::new_v4().to_string();
67        let now = SystemTime::now();
68        let now_str = format_sqlite_timestamp(now);
69        let kind_str = entry.kind.as_db_str();
70
71        sqlx::query(
72            "INSERT INTO observations \
73                (id, sender_id, type, title, what, why, where_field, learned, created_at, updated_at) \
74             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
75        )
76        .bind(&id)
77        .bind(&entry.sender_id)
78        .bind(kind_str)
79        .bind(&entry.title)
80        .bind(&entry.what)
81        .bind(&entry.why)
82        .bind(&entry.where_field)
83        .bind(&entry.learned)
84        .bind(&now_str)
85        .bind(&now_str)
86        .execute(&self.pool)
87        .await
88        .map_err(|e| MemoryError::sqlite("insert observation failed", e))?;
89
90        Ok(Observation {
91            id,
92            sender_id: entry.sender_id,
93            kind: entry.kind,
94            title: entry.title,
95            what: entry.what,
96            why: entry.why,
97            where_field: entry.where_field,
98            learned: entry.learned,
99            created_at: now,
100            updated_at: now,
101        })
102    }
103
104    /// Fetch an active observation by id. Returns `None` when the id
105    /// is missing OR the row is soft-deleted (CC-9 invariant).
106    pub async fn get_observation_by_id(
107        &self,
108        id: &str,
109    ) -> Result<Option<Observation>, MemoryError> {
110        let row: Option<ObservationTuple> = sqlx::query_as(
111            "SELECT id, sender_id, type, title, what, why, where_field, learned, \
112                    created_at, updated_at \
113             FROM observations \
114             WHERE id = ? AND deleted_at IS NULL",
115        )
116        .bind(id)
117        .fetch_optional(&self.pool)
118        .await
119        .map_err(|e| MemoryError::sqlite("get_observation_by_id failed", e))?;
120
121        match row {
122            Some(tuple) => Ok(Some(tuple_to_observation(tuple)?)),
123            None => Ok(None),
124        }
125    }
126
127    /// FTS5 search across `title`, `what`, `why`, `where_field`,
128    /// `learned`. Optional `since` filters by `created_at >=`; optional
129    /// `kind` narrows to a single observation type. Soft-deleted rows
130    /// never appear (the FTS5 triggers on `observations_au` /
131    /// `observations_ad` keep the mirror in sync).
132    ///
133    /// Result order: FTS5 `rank` ascending (best match first), then
134    /// `created_at` descending tiebreaker.
135    pub async fn search_observations(
136        &self,
137        query: &str,
138        sender_id: &str,
139        limit: i64,
140        since: Option<SystemTime>,
141        kind: Option<ObservationType>,
142    ) -> Result<Vec<Observation>, MemoryError> {
143        if query.len() < 3 {
144            return Ok(Vec::new());
145        }
146
147        // Double-quote the query to neutralize FTS5 operators (AND, OR,
148        // NOT, NEAR, *, etc.) and escape internal double quotes by
149        // doubling them, matching the pattern in messages.rs.
150        let sanitized = format!("\"{}\"", query.replace('"', "\"\""));
151
152        // Build the SQL dynamically to keep parameter binds clean.
153        // sqlx caches prepared statements per (statement, parameter
154        // shape); the 4 combinations of (since, kind) become 4 cache
155        // entries, the same cardinality as messages.rs's 2 variants for
156        // `since`.
157        let mut sql = String::from(
158            "SELECT o.id, o.sender_id, o.type, o.title, o.what, o.why, \
159                    o.where_field, o.learned, o.created_at, o.updated_at \
160             FROM observations_fts fts \
161             JOIN observations o ON o.rowid = fts.rowid \
162             WHERE observations_fts MATCH ? \
163             AND o.sender_id = ? \
164             AND o.deleted_at IS NULL",
165        );
166        if since.is_some() {
167            sql.push_str(" AND o.created_at >= ?");
168        }
169        if kind.is_some() {
170            sql.push_str(" AND o.type = ?");
171        }
172        sql.push_str(" ORDER BY rank, o.created_at DESC LIMIT ?");
173
174        let mut q = sqlx::query_as::<_, ObservationTuple>(&sql)
175            .bind(&sanitized)
176            .bind(sender_id);
177        if let Some(cutoff) = since {
178            q = q.bind(format_sqlite_timestamp(cutoff));
179        }
180        if let Some(k) = kind {
181            q = q.bind(k.as_db_str());
182        }
183        let rows: Vec<ObservationTuple> = q
184            .bind(limit)
185            .fetch_all(&self.pool)
186            .await
187            .map_err(|e| MemoryError::sqlite("search_observations failed", e))?;
188
189        let mut out = Vec::with_capacity(rows.len());
190        for tuple in rows {
191            out.push(tuple_to_observation(tuple)?);
192        }
193        Ok(out)
194    }
195
196    /// Soft-delete an observation by setting `deleted_at` to "now".
197    /// Returns `Ok(true)` if a row transitioned from active to deleted,
198    /// `Ok(false)` if the row was already deleted, missing, or never
199    /// existed (matches the `soft_delete_fact` contract).
200    ///
201    /// The `observations_au` trigger drops the row from
202    /// `observations_fts` automatically; consumers do not need to
203    /// touch the FTS mirror.
204    pub async fn soft_delete_observation(&self, id: &str) -> Result<bool, MemoryError> {
205        let now_str = format_sqlite_timestamp(SystemTime::now());
206        let result = sqlx::query(
207            "UPDATE observations SET deleted_at = ?, updated_at = ? \
208             WHERE id = ? AND deleted_at IS NULL",
209        )
210        .bind(&now_str)
211        .bind(&now_str)
212        .bind(id)
213        .execute(&self.pool)
214        .await
215        .map_err(|e| MemoryError::sqlite("soft delete observation failed", e))?;
216
217        Ok(result.rows_affected() > 0)
218    }
219
220    /// Read soft-deleted observations for a sender. Recovery helper;
221    /// the trait exposes this so future tooling can offer an
222    /// "undelete" surface without dropping back to the inherent
223    /// `Store`. Order: most-recently-deleted first.
224    pub async fn list_soft_deleted_observations(
225        &self,
226        sender_id: &str,
227    ) -> Result<Vec<Observation>, MemoryError> {
228        let rows: Vec<ObservationTuple> = sqlx::query_as(
229            "SELECT id, sender_id, type, title, what, why, where_field, learned, \
230                    created_at, updated_at \
231             FROM observations \
232             WHERE sender_id = ? AND deleted_at IS NOT NULL \
233             ORDER BY deleted_at DESC",
234        )
235        .bind(sender_id)
236        .fetch_all(&self.pool)
237        .await
238        .map_err(|e| MemoryError::sqlite("list_soft_deleted_observations failed", e))?;
239
240        let mut out = Vec::with_capacity(rows.len());
241        for tuple in rows {
242            out.push(tuple_to_observation(tuple)?);
243        }
244        Ok(out)
245    }
246
247    /// Count of active (non-soft-deleted) observations for a sender.
248    /// Used by the 4-tuple `get_memory_stats` extension; see
249    /// [`Self::get_memory_stats`].
250    pub(crate) async fn count_observations(&self, sender_id: &str) -> Result<i64, MemoryError> {
251        let (count,): (i64,) = sqlx::query_as(
252            "SELECT COUNT(*) FROM observations \
253             WHERE sender_id = ? AND deleted_at IS NULL",
254        )
255        .bind(sender_id)
256        .fetch_one(&self.pool)
257        .await
258        .map_err(|e| MemoryError::sqlite("count_observations failed", e))?;
259        Ok(count)
260    }
261}