1use crate::error::StorageError;
4use crate::pool::Storage;
5use chrono::{DateTime, Utc};
6use evolve_core::ids::{ConfigId, SessionId, SignalId};
7use uuid::Uuid;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum SignalKind {
12 Explicit,
14 Implicit,
16}
17
18impl SignalKind {
19 fn as_str(self) -> &'static str {
20 match self {
21 Self::Explicit => "explicit",
22 Self::Implicit => "implicit",
23 }
24 }
25
26 fn from_str(s: &str) -> Result<Self, StorageError> {
27 Ok(match s {
28 "explicit" => Self::Explicit,
29 "implicit" => Self::Implicit,
30 other => {
31 return Err(StorageError::Sqlx(sqlx::Error::Decode(
32 format!("unknown signal kind {other:?}").into(),
33 )));
34 }
35 })
36 }
37}
38
39#[derive(Debug, Clone)]
41pub struct Signal {
42 pub id: SignalId,
44 pub session_id: SessionId,
46 pub kind: SignalKind,
48 pub source: String,
50 pub value: f64,
52 pub recorded_at: DateTime<Utc>,
54 pub payload_json: Option<String>,
56}
57
58type SignalRow = (String, String, String, String, f64, String, Option<String>);
61
62#[derive(Debug, Clone)]
64pub struct SignalRepo<'a> {
65 storage: &'a Storage,
66}
67
68impl<'a> SignalRepo<'a> {
69 pub fn new(storage: &'a Storage) -> Self {
71 Self { storage }
72 }
73
74 pub async fn insert(&self, s: &Signal) -> Result<(), StorageError> {
77 if let Some(payload) = s.payload_json.as_deref()
78 && looks_like_source_code(payload)
79 {
80 return Err(StorageError::PayloadRejected(
81 "payload contains code-like content",
82 ));
83 }
84 sqlx::query(
85 "INSERT INTO signals
86 (id, session_id, kind, source, value, recorded_at, payload_json)
87 VALUES (?, ?, ?, ?, ?, ?, ?)",
88 )
89 .bind(s.id.to_string())
90 .bind(s.session_id.to_string())
91 .bind(s.kind.as_str())
92 .bind(&s.source)
93 .bind(s.value)
94 .bind(s.recorded_at.to_rfc3339())
95 .bind(s.payload_json.as_deref())
96 .execute(self.storage.pool())
97 .await?;
98 Ok(())
99 }
100
101 pub async fn list_for_session(
103 &self,
104 session_id: SessionId,
105 ) -> Result<Vec<Signal>, StorageError> {
106 let rows: Vec<SignalRow> = sqlx::query_as(
107 "SELECT id, session_id, kind, source, value, recorded_at, payload_json
108 FROM signals
109 WHERE session_id = ?
110 ORDER BY recorded_at ASC",
111 )
112 .bind(session_id.to_string())
113 .fetch_all(self.storage.pool())
114 .await?;
115 rows.into_iter().map(row_to_signal).collect()
116 }
117
118 pub async fn list_for_config(&self, config_id: ConfigId) -> Result<Vec<Signal>, StorageError> {
120 let rows: Vec<SignalRow> = sqlx::query_as(
121 "SELECT sig.id, sig.session_id, sig.kind, sig.source, sig.value,
122 sig.recorded_at, sig.payload_json
123 FROM signals sig
124 JOIN sessions s ON s.id = sig.session_id
125 WHERE s.config_id = ?
126 ORDER BY sig.recorded_at ASC",
127 )
128 .bind(config_id.to_string())
129 .fetch_all(self.storage.pool())
130 .await?;
131 rows.into_iter().map(row_to_signal).collect()
132 }
133}
134
135fn looks_like_source_code(payload: &str) -> bool {
138 const BANNED: &[&str] = &[
139 "fn ",
140 "def ",
141 "class ",
142 "function ",
143 "=>",
144 "import ",
145 "#include",
146 "public class",
147 "console.log",
148 "println!",
149 "SELECT ",
150 "INSERT INTO",
151 ];
152 BANNED.iter().any(|needle| payload.contains(needle))
153}
154
155fn row_to_signal(
156 (id, session_id, kind, source, value, recorded_at, payload): SignalRow,
157) -> Result<Signal, StorageError> {
158 Ok(Signal {
159 id: SignalId::from_uuid(Uuid::parse_str(&id)?),
160 session_id: SessionId::from_uuid(Uuid::parse_str(&session_id)?),
161 kind: SignalKind::from_str(&kind)?,
162 source,
163 value,
164 recorded_at: DateTime::parse_from_rfc3339(&recorded_at)
165 .map_err(|e| StorageError::Sqlx(sqlx::Error::Decode(Box::new(e))))?
166 .with_timezone(&Utc),
167 payload_json: payload,
168 })
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use crate::agent_configs::{AgentConfigRepo, AgentConfigRow, ConfigRole};
175 use crate::projects::{Project, ProjectRepo};
176 use crate::sessions::{Session, SessionRepo, SessionVariant};
177 use evolve_core::agent_config::AgentConfig;
178 use evolve_core::ids::{AdapterId, ProjectId};
179
180 async fn seeded() -> (Storage, SessionId, ConfigId) {
181 let storage = Storage::in_memory_for_tests().await.unwrap();
182 let pid = ProjectId::new();
183 ProjectRepo::new(&storage)
184 .insert(&Project {
185 id: pid,
186 adapter_id: AdapterId::new("claude-code"),
187 root_path: "/tmp/signals-test".into(),
188 name: "g".into(),
189 created_at: Utc::now(),
190 champion_config_id: None,
191 })
192 .await
193 .unwrap();
194
195 let cfg = AgentConfigRow {
196 id: ConfigId::new(),
197 project_id: pid,
198 adapter_id: AdapterId::new("claude-code"),
199 role: ConfigRole::Champion,
200 fingerprint: 1,
201 payload: AgentConfig::default_for("claude-code"),
202 created_at: Utc::now(),
203 };
204 AgentConfigRepo::new(&storage).insert(&cfg).await.unwrap();
205
206 let sid = SessionId::new();
207 SessionRepo::new(&storage)
208 .insert(&Session {
209 id: sid,
210 project_id: pid,
211 experiment_id: None,
212 variant: SessionVariant::Champion,
213 config_id: cfg.id,
214 started_at: Utc::now(),
215 ended_at: Utc::now(),
216 adapter_session_ref: None,
217 })
218 .await
219 .unwrap();
220 (storage, sid, cfg.id)
221 }
222
223 #[tokio::test]
224 async fn insert_then_list_for_session_roundtrips() {
225 let (storage, sid, _cfg) = seeded().await;
226 let repo = SignalRepo::new(&storage);
227 let sig = Signal {
228 id: SignalId::new(),
229 session_id: sid,
230 kind: SignalKind::Implicit,
231 source: "tests_passed".into(),
232 value: 1.0,
233 recorded_at: Utc::now(),
234 payload_json: Some("{\"exit_code\":0}".into()),
235 };
236 repo.insert(&sig).await.unwrap();
237 let got = repo.list_for_session(sid).await.unwrap();
238 assert_eq!(got.len(), 1);
239 assert_eq!(got[0].source, "tests_passed");
240 assert_eq!(got[0].value, 1.0);
241 }
242
243 #[tokio::test]
244 async fn list_for_config_joins_via_sessions() {
245 let (storage, sid, cfg) = seeded().await;
246 let repo = SignalRepo::new(&storage);
247 for (src, val) in [("a", 1.0), ("b", 0.0)] {
248 repo.insert(&Signal {
249 id: SignalId::new(),
250 session_id: sid,
251 kind: SignalKind::Explicit,
252 source: src.into(),
253 value: val,
254 recorded_at: Utc::now(),
255 payload_json: None,
256 })
257 .await
258 .unwrap();
259 }
260 let got = repo.list_for_config(cfg).await.unwrap();
261 assert_eq!(got.len(), 2);
262 }
263
264 #[tokio::test]
265 async fn payload_json_never_contains_code_like_content() {
266 let (storage, sid, _cfg) = seeded().await;
267 let repo = SignalRepo::new(&storage);
268 let err = repo
269 .insert(&Signal {
270 id: SignalId::new(),
271 session_id: sid,
272 kind: SignalKind::Implicit,
273 source: "suspicious".into(),
274 value: 0.5,
275 recorded_at: Utc::now(),
276 payload_json: Some("fn main() { let x = 1; }".into()),
277 })
278 .await
279 .unwrap_err();
280 assert!(matches!(err, StorageError::PayloadRejected(_)));
281 }
282
283 #[tokio::test]
284 async fn insert_rejects_value_outside_unit_interval() {
285 let (storage, sid, _cfg) = seeded().await;
286 let repo = SignalRepo::new(&storage);
287 let err = repo
288 .insert(&Signal {
289 id: SignalId::new(),
290 session_id: sid,
291 kind: SignalKind::Implicit,
292 source: "bad_value".into(),
293 value: 1.5,
294 recorded_at: Utc::now(),
295 payload_json: None,
296 })
297 .await
298 .unwrap_err();
299 assert!(matches!(err, StorageError::Sqlx(sqlx::Error::Database(_))));
300 }
301}