yarli_cli/yarli-store/src/
postgres.rs1use std::future::Future;
7use std::sync::Arc;
8use std::thread;
9use std::time::Instant;
10
11use crate::yarli_core::domain::{EntityType, Event, EventId};
12use crate::yarli_observability::YarliMetrics;
13use chrono::{DateTime, Utc};
14use sqlx::postgres::{PgPool, PgPoolOptions, PgRow};
15use sqlx::Row;
16use tokio::runtime::{Builder, Handle, RuntimeFlavor};
17use tracing::warn;
18
19use crate::yarli_store::error::StoreError;
20use crate::yarli_store::event_store::{EventQuery, EventStore};
21
22#[derive(Debug, Clone)]
24pub struct PostgresEventStore {
25 pool: PgPool,
26 metrics: Option<Arc<YarliMetrics>>,
27 #[cfg(feature = "chaos")]
28 chaos: Option<Arc<crate::yarli_chaos::ChaosController>>,
29}
30
31impl PostgresEventStore {
32 pub fn new(database_url: &str) -> Result<Self, StoreError> {
34 let pool = PgPoolOptions::new()
35 .connect_lazy(database_url)
36 .map_err(|error| StoreError::Database(error.to_string()))?;
37
38 Ok(Self {
39 pool,
40 metrics: None,
41 #[cfg(feature = "chaos")]
42 chaos: None,
43 })
44 }
45
46 pub fn from_pool(pool: PgPool) -> Self {
48 Self {
49 pool,
50 metrics: None,
51 #[cfg(feature = "chaos")]
52 chaos: None,
53 }
54 }
55
56 pub fn with_metrics(mut self, metrics: Arc<YarliMetrics>) -> Self {
58 self.metrics = Some(metrics);
59 self
60 }
61
62 #[cfg(feature = "chaos")]
63 pub fn with_chaos(mut self, chaos: Arc<crate::yarli_chaos::ChaosController>) -> Self {
65 self.chaos = Some(chaos);
66 self
67 }
68
69 pub fn pool(&self) -> &PgPool {
71 &self.pool
72 }
73
74 fn record_duration(&self, operation: &str, start: Instant) {
75 if let Some(metrics) = &self.metrics {
76 let duration = start.elapsed().as_secs_f64();
77 metrics.record_store_duration(operation, duration);
78 if duration > 1.0 {
79 metrics.record_store_slow_query(operation);
80 }
81 }
82 }
83
84 fn run_async<T, Fut>(&self, fut: Fut) -> Result<T, StoreError>
85 where
86 T: Send + 'static,
87 Fut: Future<Output = Result<T, StoreError>> + Send + 'static,
88 {
89 match Handle::try_current() {
90 Ok(handle) => match handle.runtime_flavor() {
91 RuntimeFlavor::MultiThread => tokio::task::block_in_place(|| handle.block_on(fut)),
92 RuntimeFlavor::CurrentThread => thread::spawn(move || {
93 let runtime = Builder::new_current_thread()
94 .enable_all()
95 .build()
96 .map_err(|error| StoreError::Runtime(error.to_string()))?;
97 runtime.block_on(fut)
98 })
99 .join()
100 .map_err(|_| StoreError::Runtime("postgres operation panicked".to_string()))?,
101 _ => {
102 let runtime = Builder::new_current_thread()
103 .enable_all()
104 .build()
105 .map_err(|error| StoreError::Runtime(error.to_string()))?;
106 runtime.block_on(fut)
107 }
108 },
109 Err(_) => {
110 let runtime = Builder::new_current_thread()
111 .enable_all()
112 .build()
113 .map_err(|error| StoreError::Runtime(error.to_string()))?;
114 runtime.block_on(fut)
115 }
116 }
117 }
118}
119
120impl EventStore for PostgresEventStore {
121 fn append(&self, event: Event) -> Result<(), StoreError> {
122 let start = Instant::now();
123 let pool = self.pool.clone();
124 #[cfg(feature = "chaos")]
125 let chaos = self.chaos.clone();
126 let result = self.run_async(async move {
127 #[cfg(feature = "chaos")]
128 if let Some(chaos) = chaos {
129 chaos
130 .inject("store_append_event")
131 .await
132 .map_err(|e| StoreError::Runtime(e.to_string()))?;
133 }
134
135 let event_id = event.event_id;
136 let idempotency_key = event.idempotency_key.clone();
137 let entity_type = entity_type_to_db(event.entity_type);
138
139 let result = sqlx::query(
140 r#"
141 INSERT INTO events (
142 event_id,
143 occurred_at,
144 entity_type,
145 entity_id,
146 event_type,
147 payload,
148 correlation_id,
149 causation_id,
150 actor,
151 idempotency_key
152 )
153 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
154 "#,
155 )
156 .bind(event_id)
157 .bind(event.occurred_at)
158 .bind(entity_type)
159 .bind(event.entity_id)
160 .bind(event.event_type)
161 .bind(event.payload)
162 .bind(event.correlation_id)
163 .bind(event.causation_id)
164 .bind(event.actor)
165 .bind(event.idempotency_key)
166 .execute(&pool)
167 .await;
168
169 match result {
170 Ok(_) => Ok(()),
171 Err(sqlx::Error::Database(db_error))
172 if db_error.code().as_deref() == Some("23505") =>
173 {
174 match classify_unique_violation(db_error.constraint()) {
175 UniqueViolation::IdempotencyKey => {
176 let key = idempotency_key.unwrap_or_else(|| "<unknown>".to_string());
177 Err(StoreError::DuplicateIdempotencyKey(key))
178 }
179 UniqueViolation::EventId => Err(StoreError::DuplicateEventId(event_id)),
180 UniqueViolation::Unknown => {
181 Err(StoreError::Database(db_error.message().to_string()))
182 }
183 }
184 }
185 Err(error) => Err(StoreError::Database(error.to_string())),
186 }
187 });
188
189 self.record_duration("append", start);
190 result
191 }
192
193 fn get(&self, event_id: EventId) -> Result<Event, StoreError> {
194 let start = Instant::now();
195 let pool = self.pool.clone();
196 let result = self.run_async(async move {
197 let row = sqlx::query(
198 r#"
199 SELECT
200 event_id,
201 occurred_at,
202 entity_type,
203 entity_id,
204 event_type,
205 payload,
206 correlation_id,
207 causation_id,
208 actor,
209 idempotency_key
210 FROM events
211 WHERE event_id = $1
212 "#,
213 )
214 .bind(event_id)
215 .fetch_optional(&pool)
216 .await
217 .map_err(|error| StoreError::Database(error.to_string()))?;
218
219 match row {
220 Some(row) => row_to_event(row),
221 None => Err(StoreError::EventNotFound(event_id)),
222 }
223 });
224
225 self.record_duration("get", start);
226 result
227 }
228
229 fn query(&self, query: &EventQuery) -> Result<Vec<Event>, StoreError> {
230 let start = Instant::now();
231 let pool = self.pool.clone();
232 let entity_type = query.entity_type.map(entity_type_to_db);
233 let entity_id = query.entity_id.clone();
234 let correlation_id = query.correlation_id;
235 let event_type = query.event_type.clone();
236 let limit = query.limit.map(|value| value.min(i64::MAX as usize) as i64);
237 let after_event_id = query.after_event_id;
238
239 let result = self.run_async(async move {
240 let after_occurred_at = match after_event_id {
241 Some(anchor_id) => {
242 let anchor = sqlx::query(
243 r#"
244 SELECT occurred_at
245 FROM events
246 WHERE event_id = $1
247 "#,
248 )
249 .bind(anchor_id)
250 .fetch_optional(&pool)
251 .await
252 .map_err(|error| StoreError::Database(error.to_string()))?;
253
254 match anchor {
255 Some(row) => Some(
256 row.try_get::<DateTime<Utc>, _>("occurred_at")
257 .map_err(|error| StoreError::Database(error.to_string()))?,
258 ),
259 None => return Err(StoreError::EventNotFound(anchor_id)),
260 }
261 }
262 None => None,
263 };
264
265 let rows = sqlx::query(
266 r#"
267 SELECT
268 event_id,
269 occurred_at,
270 entity_type,
271 entity_id,
272 event_type,
273 payload,
274 correlation_id,
275 causation_id,
276 actor,
277 idempotency_key
278 FROM events
279 WHERE
280 ($1::text IS NULL OR entity_type = $1)
281 AND ($2::text IS NULL OR entity_id = $2)
282 AND ($3::uuid IS NULL OR correlation_id = $3)
283 AND ($4::text IS NULL OR event_type = $4)
284 AND (
285 $5::timestamptz IS NULL
286 OR occurred_at > $5
287 OR (occurred_at = $5 AND event_id > $6::uuid)
288 )
289 ORDER BY occurred_at ASC, event_id ASC
290 LIMIT COALESCE($7::bigint, 9223372036854775807)
291 "#,
292 )
293 .bind(entity_type)
294 .bind(entity_id)
295 .bind(correlation_id)
296 .bind(event_type)
297 .bind(after_occurred_at)
298 .bind(after_event_id)
299 .bind(limit)
300 .fetch_all(&pool)
301 .await
302 .map_err(|error| StoreError::Database(error.to_string()))?;
303
304 rows.into_iter().map(row_to_event).collect()
305 });
306
307 self.record_duration("query", start);
308 result
309 }
310
311 fn all(&self) -> Result<Vec<Event>, StoreError> {
312 let start = Instant::now();
313 let pool = self.pool.clone();
314 let result = self.run_async(async move {
315 let rows = sqlx::query(
316 r#"
317 SELECT
318 event_id,
319 occurred_at,
320 entity_type,
321 entity_id,
322 event_type,
323 payload,
324 correlation_id,
325 causation_id,
326 actor,
327 idempotency_key
328 FROM events
329 ORDER BY created_at ASC, event_id ASC
330 "#,
331 )
332 .fetch_all(&pool)
333 .await
334 .map_err(|error| StoreError::Database(error.to_string()))?;
335
336 rows.into_iter().map(row_to_event).collect()
337 });
338
339 self.record_duration("all", start);
340 result
341 }
342
343 fn len(&self) -> usize {
344 let start = Instant::now();
345 let pool = self.pool.clone();
346 let result = match self.run_async(async move {
347 let count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*)::bigint FROM events")
348 .fetch_one(&pool)
349 .await
350 .map_err(|error| StoreError::Database(error.to_string()))?;
351 Ok(count.max(0) as usize)
352 }) {
353 Ok(count) => count,
354 Err(error) => {
355 warn!(
356 error = %error,
357 "failed to compute event store length; returning zero"
358 );
359 0
360 }
361 };
362
363 self.record_duration("len", start);
364 result
365 }
366}
367
368#[derive(Debug, Clone, Copy, PartialEq, Eq)]
369enum UniqueViolation {
370 EventId,
371 IdempotencyKey,
372 Unknown,
373}
374
375fn classify_unique_violation(constraint: Option<&str>) -> UniqueViolation {
376 match constraint {
377 Some("ux_events_idempotency_key") => UniqueViolation::IdempotencyKey,
378 Some("events_pkey") => UniqueViolation::EventId,
379 _ => UniqueViolation::Unknown,
380 }
381}
382
383fn entity_type_to_db(entity_type: EntityType) -> &'static str {
384 match entity_type {
385 EntityType::Run => "run",
386 EntityType::Task => "task",
387 EntityType::Worktree => "worktree",
388 EntityType::Merge => "merge",
389 EntityType::Command => "command",
390 EntityType::Gate => "gate",
391 EntityType::Policy => "policy",
392 }
393}
394
395fn entity_type_from_db(value: &str) -> Result<EntityType, StoreError> {
396 match value {
397 "run" => Ok(EntityType::Run),
398 "task" => Ok(EntityType::Task),
399 "worktree" => Ok(EntityType::Worktree),
400 "merge" => Ok(EntityType::Merge),
401 "command" => Ok(EntityType::Command),
402 "gate" => Ok(EntityType::Gate),
403 "policy" => Ok(EntityType::Policy),
404 other => Err(StoreError::InvalidEntityType(other.to_string())),
405 }
406}
407
408fn row_to_event(row: PgRow) -> Result<Event, StoreError> {
409 let entity_type_raw: String = row
410 .try_get("entity_type")
411 .map_err(|error| StoreError::Database(error.to_string()))?;
412
413 Ok(Event {
414 event_id: row
415 .try_get("event_id")
416 .map_err(|error| StoreError::Database(error.to_string()))?,
417 occurred_at: row
418 .try_get("occurred_at")
419 .map_err(|error| StoreError::Database(error.to_string()))?,
420 entity_type: entity_type_from_db(&entity_type_raw)?,
421 entity_id: row
422 .try_get("entity_id")
423 .map_err(|error| StoreError::Database(error.to_string()))?,
424 event_type: row
425 .try_get("event_type")
426 .map_err(|error| StoreError::Database(error.to_string()))?,
427 payload: row
428 .try_get("payload")
429 .map_err(|error| StoreError::Database(error.to_string()))?,
430 correlation_id: row
431 .try_get("correlation_id")
432 .map_err(|error| StoreError::Database(error.to_string()))?,
433 causation_id: row
434 .try_get("causation_id")
435 .map_err(|error| StoreError::Database(error.to_string()))?,
436 actor: row
437 .try_get("actor")
438 .map_err(|error| StoreError::Database(error.to_string()))?,
439 idempotency_key: row
440 .try_get("idempotency_key")
441 .map_err(|error| StoreError::Database(error.to_string()))?,
442 })
443}
444
445#[cfg(test)]
446mod tests {
447 use crate::yarli_core::domain::EntityType;
448
449 use super::{
450 classify_unique_violation, entity_type_from_db, entity_type_to_db, UniqueViolation,
451 };
452
453 #[test]
454 fn entity_type_codec_round_trips() {
455 let values = [
456 EntityType::Run,
457 EntityType::Task,
458 EntityType::Worktree,
459 EntityType::Merge,
460 EntityType::Command,
461 EntityType::Gate,
462 EntityType::Policy,
463 ];
464
465 for value in values {
466 let db_value = entity_type_to_db(value);
467 let parsed = entity_type_from_db(db_value).unwrap();
468 assert_eq!(parsed, value);
469 }
470 }
471
472 #[test]
473 fn classify_unique_violation_maps_known_constraints() {
474 assert_eq!(
475 classify_unique_violation(Some("events_pkey")),
476 UniqueViolation::EventId
477 );
478 assert_eq!(
479 classify_unique_violation(Some("ux_events_idempotency_key")),
480 UniqueViolation::IdempotencyKey
481 );
482 assert_eq!(
483 classify_unique_violation(Some("some_other_constraint")),
484 UniqueViolation::Unknown
485 );
486 assert_eq!(classify_unique_violation(None), UniqueViolation::Unknown);
487 }
488}