Skip to main content

yarli_cli/yarli-store/src/
postgres.rs

1//! Postgres-backed event store implementation.
2//!
3//! Implements `EventStore` semantics over the `events` table from migration
4//! `0001_init.sql`.
5
6use std::future::Future;
7use std::sync::Arc;
8use std::thread;
9use std::time::Instant;
10
11use crate::yarli_core::domain::{EntityType, Event, EventId};
12use crate::yarli_observability::YarliMetrics;
13use chrono::{DateTime, Utc};
14use sqlx::postgres::{PgPool, PgPoolOptions, PgRow};
15use sqlx::Row;
16use tokio::runtime::{Builder, Handle, RuntimeFlavor};
17use tracing::warn;
18
19use crate::yarli_store::error::StoreError;
20use crate::yarli_store::event_store::{EventQuery, EventStore};
21
22/// Postgres-backed event store.
23#[derive(Debug, Clone)]
24pub struct PostgresEventStore {
25    pool: PgPool,
26    metrics: Option<Arc<YarliMetrics>>,
27    #[cfg(feature = "chaos")]
28    chaos: Option<Arc<crate::yarli_chaos::ChaosController>>,
29}
30
31impl PostgresEventStore {
32    /// Create a store backed by a lazily-connected `PgPool`.
33    pub fn new(database_url: &str) -> Result<Self, StoreError> {
34        let pool = PgPoolOptions::new()
35            .connect_lazy(database_url)
36            .map_err(|error| StoreError::Database(error.to_string()))?;
37
38        Ok(Self {
39            pool,
40            metrics: None,
41            #[cfg(feature = "chaos")]
42            chaos: None,
43        })
44    }
45
46    /// Construct from an existing pool.
47    pub fn from_pool(pool: PgPool) -> Self {
48        Self {
49            pool,
50            metrics: None,
51            #[cfg(feature = "chaos")]
52            chaos: None,
53        }
54    }
55
56    /// Attach metrics registry for telemetry.
57    pub fn with_metrics(mut self, metrics: Arc<YarliMetrics>) -> Self {
58        self.metrics = Some(metrics);
59        self
60    }
61
62    #[cfg(feature = "chaos")]
63    /// Configure chaos controller for fault injection.
64    pub fn with_chaos(mut self, chaos: Arc<crate::yarli_chaos::ChaosController>) -> Self {
65        self.chaos = Some(chaos);
66        self
67    }
68
69    /// Access the underlying pool.
70    pub fn pool(&self) -> &PgPool {
71        &self.pool
72    }
73
74    fn record_duration(&self, operation: &str, start: Instant) {
75        if let Some(metrics) = &self.metrics {
76            let duration = start.elapsed().as_secs_f64();
77            metrics.record_store_duration(operation, duration);
78            if duration > 1.0 {
79                metrics.record_store_slow_query(operation);
80            }
81        }
82    }
83
84    fn run_async<T, Fut>(&self, fut: Fut) -> Result<T, StoreError>
85    where
86        T: Send + 'static,
87        Fut: Future<Output = Result<T, StoreError>> + Send + 'static,
88    {
89        match Handle::try_current() {
90            Ok(handle) => match handle.runtime_flavor() {
91                RuntimeFlavor::MultiThread => tokio::task::block_in_place(|| handle.block_on(fut)),
92                RuntimeFlavor::CurrentThread => thread::spawn(move || {
93                    let runtime = Builder::new_current_thread()
94                        .enable_all()
95                        .build()
96                        .map_err(|error| StoreError::Runtime(error.to_string()))?;
97                    runtime.block_on(fut)
98                })
99                .join()
100                .map_err(|_| StoreError::Runtime("postgres operation panicked".to_string()))?,
101                _ => {
102                    let runtime = Builder::new_current_thread()
103                        .enable_all()
104                        .build()
105                        .map_err(|error| StoreError::Runtime(error.to_string()))?;
106                    runtime.block_on(fut)
107                }
108            },
109            Err(_) => {
110                let runtime = Builder::new_current_thread()
111                    .enable_all()
112                    .build()
113                    .map_err(|error| StoreError::Runtime(error.to_string()))?;
114                runtime.block_on(fut)
115            }
116        }
117    }
118}
119
120impl EventStore for PostgresEventStore {
121    fn append(&self, event: Event) -> Result<(), StoreError> {
122        let start = Instant::now();
123        let pool = self.pool.clone();
124        #[cfg(feature = "chaos")]
125        let chaos = self.chaos.clone();
126        let result = self.run_async(async move {
127            #[cfg(feature = "chaos")]
128            if let Some(chaos) = chaos {
129                chaos
130                    .inject("store_append_event")
131                    .await
132                    .map_err(|e| StoreError::Runtime(e.to_string()))?;
133            }
134
135            let event_id = event.event_id;
136            let idempotency_key = event.idempotency_key.clone();
137            let entity_type = entity_type_to_db(event.entity_type);
138
139            let result = sqlx::query(
140                r#"
141                INSERT INTO events (
142                    event_id,
143                    occurred_at,
144                    entity_type,
145                    entity_id,
146                    event_type,
147                    payload,
148                    correlation_id,
149                    causation_id,
150                    actor,
151                    idempotency_key
152                )
153                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
154                "#,
155            )
156            .bind(event_id)
157            .bind(event.occurred_at)
158            .bind(entity_type)
159            .bind(event.entity_id)
160            .bind(event.event_type)
161            .bind(event.payload)
162            .bind(event.correlation_id)
163            .bind(event.causation_id)
164            .bind(event.actor)
165            .bind(event.idempotency_key)
166            .execute(&pool)
167            .await;
168
169            match result {
170                Ok(_) => Ok(()),
171                Err(sqlx::Error::Database(db_error))
172                    if db_error.code().as_deref() == Some("23505") =>
173                {
174                    match classify_unique_violation(db_error.constraint()) {
175                        UniqueViolation::IdempotencyKey => {
176                            let key = idempotency_key.unwrap_or_else(|| "<unknown>".to_string());
177                            Err(StoreError::DuplicateIdempotencyKey(key))
178                        }
179                        UniqueViolation::EventId => Err(StoreError::DuplicateEventId(event_id)),
180                        UniqueViolation::Unknown => {
181                            Err(StoreError::Database(db_error.message().to_string()))
182                        }
183                    }
184                }
185                Err(error) => Err(StoreError::Database(error.to_string())),
186            }
187        });
188
189        self.record_duration("append", start);
190        result
191    }
192
193    fn get(&self, event_id: EventId) -> Result<Event, StoreError> {
194        let start = Instant::now();
195        let pool = self.pool.clone();
196        let result = self.run_async(async move {
197            let row = sqlx::query(
198                r#"
199                SELECT
200                    event_id,
201                    occurred_at,
202                    entity_type,
203                    entity_id,
204                    event_type,
205                    payload,
206                    correlation_id,
207                    causation_id,
208                    actor,
209                    idempotency_key
210                FROM events
211                WHERE event_id = $1
212                "#,
213            )
214            .bind(event_id)
215            .fetch_optional(&pool)
216            .await
217            .map_err(|error| StoreError::Database(error.to_string()))?;
218
219            match row {
220                Some(row) => row_to_event(row),
221                None => Err(StoreError::EventNotFound(event_id)),
222            }
223        });
224
225        self.record_duration("get", start);
226        result
227    }
228
229    fn query(&self, query: &EventQuery) -> Result<Vec<Event>, StoreError> {
230        let start = Instant::now();
231        let pool = self.pool.clone();
232        let entity_type = query.entity_type.map(entity_type_to_db);
233        let entity_id = query.entity_id.clone();
234        let correlation_id = query.correlation_id;
235        let event_type = query.event_type.clone();
236        let limit = query.limit.map(|value| value.min(i64::MAX as usize) as i64);
237        let after_event_id = query.after_event_id;
238
239        let result = self.run_async(async move {
240            let after_occurred_at = match after_event_id {
241                Some(anchor_id) => {
242                    let anchor = sqlx::query(
243                        r#"
244                        SELECT occurred_at
245                        FROM events
246                        WHERE event_id = $1
247                        "#,
248                    )
249                    .bind(anchor_id)
250                    .fetch_optional(&pool)
251                    .await
252                    .map_err(|error| StoreError::Database(error.to_string()))?;
253
254                    match anchor {
255                        Some(row) => Some(
256                            row.try_get::<DateTime<Utc>, _>("occurred_at")
257                                .map_err(|error| StoreError::Database(error.to_string()))?,
258                        ),
259                        None => return Err(StoreError::EventNotFound(anchor_id)),
260                    }
261                }
262                None => None,
263            };
264
265            let rows = sqlx::query(
266                r#"
267                SELECT
268                    event_id,
269                    occurred_at,
270                    entity_type,
271                    entity_id,
272                    event_type,
273                    payload,
274                    correlation_id,
275                    causation_id,
276                    actor,
277                    idempotency_key
278                FROM events
279                WHERE
280                    ($1::text IS NULL OR entity_type = $1)
281                    AND ($2::text IS NULL OR entity_id = $2)
282                    AND ($3::uuid IS NULL OR correlation_id = $3)
283                    AND ($4::text IS NULL OR event_type = $4)
284                    AND (
285                        $5::timestamptz IS NULL
286                        OR occurred_at > $5
287                        OR (occurred_at = $5 AND event_id > $6::uuid)
288                    )
289                ORDER BY occurred_at ASC, event_id ASC
290                LIMIT COALESCE($7::bigint, 9223372036854775807)
291                "#,
292            )
293            .bind(entity_type)
294            .bind(entity_id)
295            .bind(correlation_id)
296            .bind(event_type)
297            .bind(after_occurred_at)
298            .bind(after_event_id)
299            .bind(limit)
300            .fetch_all(&pool)
301            .await
302            .map_err(|error| StoreError::Database(error.to_string()))?;
303
304            rows.into_iter().map(row_to_event).collect()
305        });
306
307        self.record_duration("query", start);
308        result
309    }
310
311    fn all(&self) -> Result<Vec<Event>, StoreError> {
312        let start = Instant::now();
313        let pool = self.pool.clone();
314        let result = self.run_async(async move {
315            let rows = sqlx::query(
316                r#"
317                SELECT
318                    event_id,
319                    occurred_at,
320                    entity_type,
321                    entity_id,
322                    event_type,
323                    payload,
324                    correlation_id,
325                    causation_id,
326                    actor,
327                    idempotency_key
328                FROM events
329                ORDER BY created_at ASC, event_id ASC
330                "#,
331            )
332            .fetch_all(&pool)
333            .await
334            .map_err(|error| StoreError::Database(error.to_string()))?;
335
336            rows.into_iter().map(row_to_event).collect()
337        });
338
339        self.record_duration("all", start);
340        result
341    }
342
343    fn len(&self) -> usize {
344        let start = Instant::now();
345        let pool = self.pool.clone();
346        let result = match self.run_async(async move {
347            let count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*)::bigint FROM events")
348                .fetch_one(&pool)
349                .await
350                .map_err(|error| StoreError::Database(error.to_string()))?;
351            Ok(count.max(0) as usize)
352        }) {
353            Ok(count) => count,
354            Err(error) => {
355                warn!(
356                    error = %error,
357                    "failed to compute event store length; returning zero"
358                );
359                0
360            }
361        };
362
363        self.record_duration("len", start);
364        result
365    }
366}
367
368#[derive(Debug, Clone, Copy, PartialEq, Eq)]
369enum UniqueViolation {
370    EventId,
371    IdempotencyKey,
372    Unknown,
373}
374
375fn classify_unique_violation(constraint: Option<&str>) -> UniqueViolation {
376    match constraint {
377        Some("ux_events_idempotency_key") => UniqueViolation::IdempotencyKey,
378        Some("events_pkey") => UniqueViolation::EventId,
379        _ => UniqueViolation::Unknown,
380    }
381}
382
383fn entity_type_to_db(entity_type: EntityType) -> &'static str {
384    match entity_type {
385        EntityType::Run => "run",
386        EntityType::Task => "task",
387        EntityType::Worktree => "worktree",
388        EntityType::Merge => "merge",
389        EntityType::Command => "command",
390        EntityType::Gate => "gate",
391        EntityType::Policy => "policy",
392    }
393}
394
395fn entity_type_from_db(value: &str) -> Result<EntityType, StoreError> {
396    match value {
397        "run" => Ok(EntityType::Run),
398        "task" => Ok(EntityType::Task),
399        "worktree" => Ok(EntityType::Worktree),
400        "merge" => Ok(EntityType::Merge),
401        "command" => Ok(EntityType::Command),
402        "gate" => Ok(EntityType::Gate),
403        "policy" => Ok(EntityType::Policy),
404        other => Err(StoreError::InvalidEntityType(other.to_string())),
405    }
406}
407
408fn row_to_event(row: PgRow) -> Result<Event, StoreError> {
409    let entity_type_raw: String = row
410        .try_get("entity_type")
411        .map_err(|error| StoreError::Database(error.to_string()))?;
412
413    Ok(Event {
414        event_id: row
415            .try_get("event_id")
416            .map_err(|error| StoreError::Database(error.to_string()))?,
417        occurred_at: row
418            .try_get("occurred_at")
419            .map_err(|error| StoreError::Database(error.to_string()))?,
420        entity_type: entity_type_from_db(&entity_type_raw)?,
421        entity_id: row
422            .try_get("entity_id")
423            .map_err(|error| StoreError::Database(error.to_string()))?,
424        event_type: row
425            .try_get("event_type")
426            .map_err(|error| StoreError::Database(error.to_string()))?,
427        payload: row
428            .try_get("payload")
429            .map_err(|error| StoreError::Database(error.to_string()))?,
430        correlation_id: row
431            .try_get("correlation_id")
432            .map_err(|error| StoreError::Database(error.to_string()))?,
433        causation_id: row
434            .try_get("causation_id")
435            .map_err(|error| StoreError::Database(error.to_string()))?,
436        actor: row
437            .try_get("actor")
438            .map_err(|error| StoreError::Database(error.to_string()))?,
439        idempotency_key: row
440            .try_get("idempotency_key")
441            .map_err(|error| StoreError::Database(error.to_string()))?,
442    })
443}
444
445#[cfg(test)]
446mod tests {
447    use crate::yarli_core::domain::EntityType;
448
449    use super::{
450        classify_unique_violation, entity_type_from_db, entity_type_to_db, UniqueViolation,
451    };
452
453    #[test]
454    fn entity_type_codec_round_trips() {
455        let values = [
456            EntityType::Run,
457            EntityType::Task,
458            EntityType::Worktree,
459            EntityType::Merge,
460            EntityType::Command,
461            EntityType::Gate,
462            EntityType::Policy,
463        ];
464
465        for value in values {
466            let db_value = entity_type_to_db(value);
467            let parsed = entity_type_from_db(db_value).unwrap();
468            assert_eq!(parsed, value);
469        }
470    }
471
472    #[test]
473    fn classify_unique_violation_maps_known_constraints() {
474        assert_eq!(
475            classify_unique_violation(Some("events_pkey")),
476            UniqueViolation::EventId
477        );
478        assert_eq!(
479            classify_unique_violation(Some("ux_events_idempotency_key")),
480            UniqueViolation::IdempotencyKey
481        );
482        assert_eq!(
483            classify_unique_violation(Some("some_other_constraint")),
484            UniqueViolation::Unknown
485        );
486        assert_eq!(classify_unique_violation(None), UniqueViolation::Unknown);
487    }
488}