Skip to main content

eros_engine_store/
lib.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2//! Postgres + pgvector persistence layer.
3
4pub mod affinity;
5pub mod chat;
6pub mod decision;
7pub mod error_handling;
8pub mod human_insight;
9pub mod insight;
10pub mod memory;
11pub mod persona;
12pub mod pool;
13
14pub use sqlx::PgPool;
15
16/// OpenRouter call metadata captured for the audit columns on event tables
17/// (`companion_insights_events`, `companion_affinity_events`). All optional —
18/// a non-LLM event (e.g. a gift affinity event) carries the default (all None).
19#[derive(Debug, Clone, Default)]
20pub struct OpenRouterCallMeta {
21    pub generation_id: Option<String>,
22    pub model: Option<String>,
23    pub usage: Option<serde_json::Value>,
24}
25
26#[cfg(test)]
27mod migration_tests {
28    use sqlx::PgPool;
29
30    #[sqlx::test(migrations = "./migrations")]
31    async fn human_insights_has_rls_enabled(pool: PgPool) {
32        let enabled: bool = sqlx::query_scalar(
33            "SELECT relrowsecurity FROM pg_class \
34             WHERE oid = 'engine.human_insights'::regclass",
35        )
36        .fetch_one(&pool)
37        .await
38        .expect("query relrowsecurity for human_insights");
39        assert!(enabled, "RLS must be enabled on engine.human_insights");
40    }
41
42    #[sqlx::test(migrations = "./migrations")]
43    async fn sqlx_migrations_has_rls_enabled(pool: PgPool) {
44        let enabled: bool = sqlx::query_scalar(
45            "SELECT relrowsecurity FROM pg_class \
46             WHERE oid = 'public._sqlx_migrations'::regclass",
47        )
48        .fetch_one(&pool)
49        .await
50        .expect("query relrowsecurity for _sqlx_migrations");
51        assert!(enabled, "RLS must be enabled on public._sqlx_migrations");
52    }
53
54    #[sqlx::test(migrations = "./migrations")]
55    async fn chat_messages_has_no_extracted_facts_column(pool: PgPool) {
56        let exists: bool = sqlx::query_scalar(
57            "SELECT EXISTS (SELECT 1 FROM information_schema.columns \
58             WHERE table_schema = 'engine' AND table_name = 'chat_messages' \
59               AND column_name = 'extracted_facts')",
60        )
61        .fetch_one(&pool)
62        .await
63        .expect("query for extracted_facts column");
64        assert!(
65            !exists,
66            "extracted_facts column must be dropped (migration 0017)"
67        );
68    }
69
70    #[sqlx::test(migrations = "./migrations")]
71    async fn migration_0020_seeds_ten_fallback_phrases(pool: PgPool) {
72        let payload: serde_json::Value = sqlx::query_scalar(
73            "SELECT payload FROM engine.error_handling_config \
74             WHERE kind = 'chat_stream_failure_fallback_phrases'",
75        )
76        .fetch_one(&pool)
77        .await
78        .unwrap();
79        let arr = payload.as_array().expect("payload is an array");
80        assert_eq!(arr.len(), 10, "seed must carry exactly 10 phrases");
81        for item in arr {
82            assert!(item.is_string(), "each phrase must be a string: {item}");
83        }
84    }
85
86    #[sqlx::test(migrations = "./migrations")]
87    async fn pick_chat_stream_fallback_phrase_returns_seeded_phrase(pool: PgPool) {
88        use crate::error_handling::ErrorHandlingRepo;
89        let repo = ErrorHandlingRepo { pool: &pool };
90        let phrase = repo.pick_chat_stream_fallback_phrase().await.unwrap();
91        let phrase = phrase.expect("seeded phrase should be available");
92        let seeded = [
93            "huh?",
94            "hm?",
95            "...",
96            "oh?",
97            "mhm",
98            "ok",
99            "👀",
100            "😅",
101            "say again?",
102            "wait what?",
103        ];
104        assert!(
105            seeded.contains(&phrase.as_str()),
106            "picked {phrase:?} not in seed"
107        );
108    }
109
110    #[sqlx::test(migrations = "./migrations")]
111    async fn pick_chat_stream_fallback_phrase_returns_none_when_kind_missing(pool: PgPool) {
112        use crate::error_handling::ErrorHandlingRepo;
113        let repo = ErrorHandlingRepo { pool: &pool };
114        // Clear the seeded row to simulate a fresh DB without the kind.
115        sqlx::query(
116            "DELETE FROM engine.error_handling_config \
117             WHERE kind = 'chat_stream_failure_fallback_phrases'",
118        )
119        .execute(&pool)
120        .await
121        .unwrap();
122        let phrase = repo.pick_chat_stream_fallback_phrase().await.unwrap();
123        assert!(phrase.is_none(), "expected None when config row absent");
124    }
125
126    #[sqlx::test(migrations = "./migrations")]
127    async fn companion_insights_snapshot_schema(pool: PgPool) {
128        // Table exists with the five expected columns at expected types.
129        let cols: Vec<(String, String, String)> = sqlx::query_as(
130            "SELECT column_name, data_type, is_nullable \
131             FROM information_schema.columns \
132             WHERE table_schema = 'engine' \
133               AND table_name   = 'companion_insights_snapshot' \
134             ORDER BY ordinal_position",
135        )
136        .fetch_all(&pool)
137        .await
138        .expect("query columns for companion_insights_snapshot");
139        let names: Vec<&str> = cols.iter().map(|(n, _, _)| n.as_str()).collect();
140        assert_eq!(
141            names,
142            vec!["id", "user_id", "insights", "training_level", "captured_at"],
143            "column order/identity must match the migration"
144        );
145        // captured_at must be NOT NULL — sweeper sets it explicitly.
146        let captured_at_null = cols
147            .iter()
148            .find(|(n, _, _)| n == "captured_at")
149            .map(|(_, _, nullable)| nullable.as_str())
150            .unwrap();
151        assert_eq!(captured_at_null, "NO", "captured_at must be NOT NULL");
152
153        // Index on (user_id, captured_at DESC) exists.
154        let idx_exists: bool = sqlx::query_scalar(
155            "SELECT EXISTS ( \
156                SELECT 1 FROM pg_indexes \
157                 WHERE schemaname = 'engine' \
158                   AND tablename  = 'companion_insights_snapshot' \
159                   AND indexname  = 'idx_companion_insights_snapshot_user_time')",
160        )
161        .fetch_one(&pool)
162        .await
163        .expect("query pg_indexes");
164        assert!(
165            idx_exists,
166            "idx_companion_insights_snapshot_user_time must be created by 0021"
167        );
168
169        // RLS enabled (no policy → server-side only access).
170        let rls_enabled: bool = sqlx::query_scalar(
171            "SELECT relrowsecurity FROM pg_class \
172              WHERE oid = 'engine.companion_insights_snapshot'::regclass",
173        )
174        .fetch_one(&pool)
175        .await
176        .expect("query relrowsecurity");
177        assert!(
178            rls_enabled,
179            "RLS must be enabled on companion_insights_snapshot"
180        );
181    }
182
183    #[sqlx::test(migrations = "./migrations")]
184    async fn migration_0023_drops_nft_ownership_stack(pool: PgPool) {
185        // The three tables are gone.
186        for tbl in ["wallet_links", "persona_ownership", "sync_cursors"] {
187            let exists: bool = sqlx::query_scalar(
188                "SELECT EXISTS (SELECT 1 FROM information_schema.tables \
189                 WHERE table_schema = 'engine' AND table_name = $1)",
190            )
191            .bind(tbl)
192            .fetch_one(&pool)
193            .await
194            .unwrap();
195            assert!(!exists, "engine.{tbl} must be dropped by migration 0023");
196        }
197        // persona_genomes.asset_id is gone.
198        let col_exists: bool = sqlx::query_scalar(
199            "SELECT EXISTS (SELECT 1 FROM information_schema.columns \
200             WHERE table_schema = 'engine' AND table_name = 'persona_genomes' \
201               AND column_name = 'asset_id')",
202        )
203        .fetch_one(&pool)
204        .await
205        .unwrap();
206        assert!(!col_exists, "persona_genomes.asset_id must be dropped");
207        // persona_genomes itself survives (sanity).
208        let pg_exists: bool = sqlx::query_scalar(
209            "SELECT EXISTS (SELECT 1 FROM information_schema.tables \
210             WHERE table_schema = 'engine' AND table_name = 'persona_genomes')",
211        )
212        .fetch_one(&pool)
213        .await
214        .unwrap();
215        assert!(pg_exists, "persona_genomes table must survive");
216    }
217
218    #[sqlx::test(migrations = "./migrations")]
219    async fn migration_0024_strips_persona_genomes_to_chat_data(pool: PgPool) {
220        for col in ["is_active", "avatar_url"] {
221            let exists: bool = sqlx::query_scalar(
222                "SELECT EXISTS (SELECT 1 FROM information_schema.columns \
223                 WHERE table_schema = 'engine' AND table_name = 'persona_genomes' \
224                   AND column_name = $1)",
225            )
226            .bind(col)
227            .fetch_one(&pool)
228            .await
229            .unwrap();
230            assert!(
231                !exists,
232                "persona_genomes.{col} must be dropped by migration 0024"
233            );
234        }
235        // The chat-relevant columns survive.
236        for col in ["name", "system_prompt", "tip_personality", "art_metadata"] {
237            let exists: bool = sqlx::query_scalar(
238                "SELECT EXISTS (SELECT 1 FROM information_schema.columns \
239                 WHERE table_schema = 'engine' AND table_name = 'persona_genomes' \
240                   AND column_name = $1)",
241            )
242            .bind(col)
243            .fetch_one(&pool)
244            .await
245            .unwrap();
246            assert!(exists, "persona_genomes.{col} must survive migration 0024");
247        }
248    }
249}