1use std::ops::{Deref, DerefMut};
4
5#[cfg(feature = "mysql")]
6use sea_query::MysqlQueryBuilder;
7#[cfg(feature = "postgres")]
8use sea_query::PostgresQueryBuilder;
9#[cfg(feature = "sqlite")]
10use sea_query::SqliteQueryBuilder;
11use sea_query::{Cond, Expr, ExprTrait, Iden, IntoColumnRef, OnConflict, Query, SelectStatement};
12use sea_query_sqlx::SqlxBinder;
13use sqlx::{Database, Pool};
14use ulid::Ulid;
15
16use evento_core::{
17 cursor::{self, Args, Cursor, Edge, PageInfo, ReadResult, Value},
18 Executor, ReadAggregator, WriteError,
19};
20
21#[derive(Iden, Clone)]
38pub enum Event {
39 Table,
41 Id,
43 Name,
45 AggregatorType,
47 AggregatorId,
49 Version,
51 Data,
53 Metadata,
55 RoutingKey,
57 Timestamp,
59 TimestampSubsec,
61}
62
63#[derive(Iden)]
69pub enum Snapshot {
70 Table,
72 Id,
74 Type,
76 Cursor,
78 Revision,
80 Data,
82 CreatedAt,
84 UpdatedAt,
86}
87
88#[derive(Iden)]
101pub enum Subscriber {
102 Table,
104 Key,
106 WorkerId,
108 Cursor,
110 Lag,
112 Enabled,
114 CreatedAt,
116 UpdatedAt,
118}
119
120#[cfg(feature = "mysql")]
124pub type MySql = Sql<sqlx::MySql>;
125
126#[cfg(feature = "mysql")]
130pub type RwMySql = evento_core::Rw<MySql, MySql>;
131
132#[cfg(feature = "postgres")]
136pub type Postgres = Sql<sqlx::Postgres>;
137
138#[cfg(feature = "postgres")]
142pub type RwPostgres = evento_core::Rw<Postgres, Postgres>;
143
144#[cfg(feature = "sqlite")]
148pub type Sqlite = Sql<sqlx::Sqlite>;
149
150#[cfg(feature = "sqlite")]
154pub type RwSqlite = evento_core::Rw<Sqlite, Sqlite>;
155
156pub struct Sql<DB: Database>(Pool<DB>);
194
195impl<DB: Database> Sql<DB> {
196 fn build_sqlx<S: SqlxBinder>(statement: S) -> (String, sea_query_sqlx::SqlxValues) {
197 match DB::NAME {
198 #[cfg(feature = "sqlite")]
199 "SQLite" => statement.build_sqlx(SqliteQueryBuilder),
200 #[cfg(feature = "mysql")]
201 "MySQL" => statement.build_sqlx(MysqlQueryBuilder),
202 #[cfg(feature = "postgres")]
203 "PostgreSQL" => statement.build_sqlx(PostgresQueryBuilder),
204 name => panic!("'{name}' not supported, consider using SQLite, PostgreSQL or MySQL"),
205 }
206 }
207}
208
209#[async_trait::async_trait]
210impl<DB> Executor for Sql<DB>
211where
212 DB: Database,
213 for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
214 sea_query_sqlx::SqlxValues: for<'q> sqlx::IntoArguments<'q, DB>,
215 String: for<'r> sqlx::Decode<'r, DB> + sqlx::Type<DB>,
216 bool: for<'r> sqlx::Decode<'r, DB> + sqlx::Type<DB>,
217 Vec<u8>: for<'r> sqlx::Decode<'r, DB> + sqlx::Type<DB>,
218 usize: sqlx::ColumnIndex<DB::Row>,
219 evento_core::Event: for<'r> sqlx::FromRow<'r, DB::Row>,
220{
221 async fn read(
222 &self,
223 aggregators: Option<Vec<ReadAggregator>>,
224 routing_key: Option<evento_core::RoutingKey>,
225 args: Args,
226 ) -> anyhow::Result<ReadResult<evento_core::Event>> {
227 let statement = Query::select()
228 .columns([
229 Event::Id,
230 Event::Name,
231 Event::AggregatorType,
232 Event::AggregatorId,
233 Event::Version,
234 Event::Data,
235 Event::Metadata,
236 Event::RoutingKey,
237 Event::Timestamp,
238 Event::TimestampSubsec,
239 ])
240 .from(Event::Table)
241 .conditions(
242 aggregators.is_some(),
243 |q| {
244 let Some(aggregators) = aggregators else {
245 return;
246 };
247
248 let mut cond = Cond::any();
249
250 for aggregator in aggregators {
251 let mut aggregator_cond = Cond::all()
252 .add(Expr::col(Event::AggregatorType).eq(aggregator.aggregator_type));
253
254 if let Some(id) = aggregator.aggregator_id {
255 aggregator_cond =
256 aggregator_cond.add(Expr::col(Event::AggregatorId).eq(id));
257 }
258
259 if let Some(name) = aggregator.name {
260 aggregator_cond = aggregator_cond.add(Expr::col(Event::Name).eq(name));
261 }
262
263 cond = cond.add(aggregator_cond);
264 }
265
266 q.and_where(cond.into());
267 },
268 |_| {},
269 )
270 .conditions(
271 matches!(routing_key, Some(evento_core::RoutingKey::Value(_))),
272 |q| {
273 if let Some(evento_core::RoutingKey::Value(Some(ref routing_key))) = routing_key
274 {
275 q.and_where(Expr::col(Event::RoutingKey).eq(routing_key));
276 }
277
278 if let Some(evento_core::RoutingKey::Value(None)) = routing_key {
279 q.and_where(Expr::col(Event::RoutingKey).is_null());
280 }
281 },
282 |_q| {},
283 )
284 .to_owned();
285
286 Ok(Reader::new(statement)
287 .args(args)
288 .execute::<_, evento_core::Event, _>(&self.0)
289 .await?)
290 }
291
292 async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>> {
293 let statement = Query::select()
294 .columns([Subscriber::Cursor])
295 .from(Subscriber::Table)
296 .and_where(Expr::col(Subscriber::Key).eq(Expr::value(key)))
297 .limit(1)
298 .to_owned();
299
300 let (sql, values) = Self::build_sqlx(statement);
301
302 let Some((cursor,)) = sqlx::query_as_with::<DB, (Option<String>,), _>(&sql, values)
303 .fetch_optional(&self.0)
304 .await?
305 else {
306 return Ok(None);
307 };
308
309 Ok(cursor.map(|c| c.into()))
310 }
311
312 async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool> {
313 let statement = Query::select()
314 .columns([Subscriber::WorkerId, Subscriber::Enabled])
315 .from(Subscriber::Table)
316 .and_where(Expr::col(Subscriber::Key).eq(Expr::value(key)))
317 .limit(1)
318 .to_owned();
319
320 let (sql, values) = Self::build_sqlx(statement);
321
322 let (id, enabled) = sqlx::query_as_with::<DB, (String, bool), _>(&sql, values)
323 .fetch_one(&self.0)
324 .await?;
325
326 Ok(worker_id.to_string() == id && enabled)
327 }
328
329 async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()> {
330 let statement = Query::insert()
331 .into_table(Subscriber::Table)
332 .columns([Subscriber::Key, Subscriber::WorkerId, Subscriber::Lag])
333 .values_panic([key.into(), worker_id.to_string().into(), 0.into()])
334 .on_conflict(
335 OnConflict::column(Subscriber::Key)
336 .update_columns([Subscriber::WorkerId])
337 .value(Subscriber::UpdatedAt, Expr::current_timestamp())
338 .to_owned(),
339 )
340 .to_owned();
341
342 let (sql, values) = Self::build_sqlx(statement);
343
344 sqlx::query_with::<DB, _>(&sql, values)
345 .execute(&self.0)
346 .await?;
347
348 Ok(())
349 }
350
351 async fn write(&self, events: Vec<evento_core::Event>) -> Result<(), WriteError> {
352 let mut statement = Query::insert()
353 .into_table(Event::Table)
354 .columns([
355 Event::Id,
356 Event::Name,
357 Event::Data,
358 Event::Metadata,
359 Event::AggregatorType,
360 Event::AggregatorId,
361 Event::Version,
362 Event::RoutingKey,
363 Event::Timestamp,
364 Event::TimestampSubsec,
365 ])
366 .to_owned();
367
368 for event in events {
369 statement.values_panic([
370 event.id.to_string().into(),
371 event.name.into(),
372 event.data.into(),
373 event.metadata.into(),
374 event.aggregator_type.into(),
375 event.aggregator_id.into(),
376 event.version.into(),
377 event.routing_key.into(),
378 event.timestamp.into(),
379 event.timestamp_subsec.into(),
380 ]);
381 }
382
383 let (sql, values) = Self::build_sqlx(statement);
384
385 sqlx::query_with::<DB, _>(&sql, values)
386 .execute(&self.0)
387 .await
388 .map_err(|err| {
389 let err_str = err.to_string();
390 if err_str.contains("(code: 2067)") {
391 return WriteError::InvalidOriginalVersion;
392 }
393 if err_str.contains("1062 (23000): Duplicate entry") {
394 return WriteError::InvalidOriginalVersion;
395 }
396 if err_str.contains("duplicate key value violates unique constraint") {
397 return WriteError::InvalidOriginalVersion;
398 }
399 WriteError::Unknown(err.into())
400 })?;
401
402 Ok(())
403 }
404
405 async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()> {
406 let statement = Query::update()
407 .table(Subscriber::Table)
408 .values([
409 (Subscriber::Cursor, cursor.0.into()),
410 (Subscriber::Lag, lag.into()),
411 (Subscriber::UpdatedAt, Expr::current_timestamp()),
412 ])
413 .and_where(Expr::col(Subscriber::Key).eq(key))
414 .to_owned();
415
416 let (sql, values) = Self::build_sqlx(statement);
417
418 sqlx::query_with::<DB, _>(&sql, values)
419 .execute(&self.0)
420 .await?;
421
422 Ok(())
423 }
424}
425
426impl<D: Database> Clone for Sql<D> {
427 fn clone(&self) -> Self {
428 Self(self.0.clone())
429 }
430}
431
432impl<D: Database> From<Pool<D>> for Sql<D> {
433 fn from(value: Pool<D>) -> Self {
434 Self(value)
435 }
436}
437
438pub struct Reader {
479 statement: SelectStatement,
480 args: Args,
481 order: cursor::Order,
482}
483
484impl Reader {
485 pub fn new(statement: SelectStatement) -> Self {
487 Self {
488 statement,
489 args: Args::default(),
490 order: cursor::Order::Asc,
491 }
492 }
493
494 pub fn order(&mut self, order: cursor::Order) -> &mut Self {
496 self.order = order;
497
498 self
499 }
500
501 pub fn desc(&mut self) -> &mut Self {
503 self.order(cursor::Order::Desc)
504 }
505
506 pub fn args(&mut self, args: Args) -> &mut Self {
508 self.args = args;
509
510 self
511 }
512
513 pub fn backward(&mut self, last: u16, before: Option<Value>) -> &mut Self {
520 self.args(Args {
521 last: Some(last),
522 before,
523 ..Default::default()
524 })
525 }
526
527 pub fn forward(&mut self, first: u16, after: Option<Value>) -> &mut Self {
534 self.args(Args {
535 first: Some(first),
536 after,
537 ..Default::default()
538 })
539 }
540
541 pub async fn execute<'e, 'c: 'e, DB, O, E>(
554 &mut self,
555 executor: E,
556 ) -> anyhow::Result<ReadResult<O>>
557 where
558 DB: Database,
559 E: 'e + sqlx::Executor<'c, Database = DB>,
560 O: for<'r> sqlx::FromRow<'r, DB::Row>,
561 O: Cursor,
562 O: Send + Unpin,
563 O: Bind<Cursor = O>,
564 <<O as Bind>::I as IntoIterator>::IntoIter: DoubleEndedIterator,
565 <<O as Bind>::V as IntoIterator>::IntoIter: DoubleEndedIterator,
566 sea_query_sqlx::SqlxValues: for<'q> sqlx::IntoArguments<'q, DB>,
567 {
568 let limit = self.build_reader::<O, O>()?;
569
570 let (sql, values) = match DB::NAME {
571 #[cfg(feature = "sqlite")]
572 "SQLite" => self.statement.build_sqlx(SqliteQueryBuilder),
573 #[cfg(feature = "mysql")]
574 "MySQL" => self.build_sqlx(MysqlQueryBuilder),
575 #[cfg(feature = "postgres")]
576 "PostgreSQL" => self.build_sqlx(PostgresQueryBuilder),
577 name => panic!("'{name}' not supported, consider using SQLite, PostgreSQL or MySQL"),
578 };
579
580 let mut rows = sqlx::query_as_with::<DB, O, _>(&sql, values)
581 .fetch_all(executor)
582 .await?;
583
584 let has_more = rows.len() > limit as usize;
585 if has_more {
586 rows.pop();
587 }
588
589 let mut edges = vec![];
590 for node in rows.into_iter() {
591 edges.push(Edge {
592 cursor: node.serialize_cursor()?,
593 node,
594 });
595 }
596
597 if self.args.is_backward() {
598 edges = edges.into_iter().rev().collect();
599 }
600
601 let page_info = if self.args.is_backward() {
602 let start_cursor = edges.first().map(|e| e.cursor.clone());
603
604 PageInfo {
605 has_previous_page: has_more,
606 has_next_page: false,
607 start_cursor,
608 end_cursor: None,
609 }
610 } else {
611 let end_cursor = edges.last().map(|e| e.cursor.clone());
612 PageInfo {
613 has_previous_page: false,
614 has_next_page: has_more,
615 start_cursor: None,
616 end_cursor,
617 }
618 };
619
620 Ok(ReadResult { edges, page_info })
621 }
622
623 fn build_reader<O: Cursor, B: Bind<Cursor = O>>(&mut self) -> Result<u16, cursor::CursorError>
624 where
625 B::T: Clone,
626 <<B as Bind>::I as IntoIterator>::IntoIter: DoubleEndedIterator,
627 <<B as Bind>::V as IntoIterator>::IntoIter: DoubleEndedIterator,
628 {
629 let (limit, cursor) = self.args.get_info();
630
631 if let Some(cursor) = cursor.as_ref() {
632 self.build_reader_where::<O, B>(cursor)?;
633 }
634
635 self.build_reader_order::<B>();
636 self.limit((limit + 1).into());
637
638 Ok(limit)
639 }
640
641 fn build_reader_where<O, B>(&mut self, cursor: &Value) -> Result<(), cursor::CursorError>
642 where
643 O: Cursor,
644 B: Bind<Cursor = O>,
645 B::T: Clone,
646 <<B as Bind>::I as IntoIterator>::IntoIter: DoubleEndedIterator,
647 <<B as Bind>::V as IntoIterator>::IntoIter: DoubleEndedIterator,
648 {
649 let is_order_desc = self.is_order_desc();
650 let cursor = O::deserialize_cursor(cursor)?;
651 let colums = B::columns().into_iter().rev();
652 let values = B::values(cursor).into_iter().rev();
653
654 let mut expr = None::<Expr>;
655 for (col, value) in colums.zip(values) {
656 let current_expr = if is_order_desc {
657 Expr::col(col.clone()).lt(value.clone())
658 } else {
659 Expr::col(col.clone()).gt(value.clone())
660 };
661
662 let Some(ref prev_expr) = expr else {
663 expr = Some(current_expr.clone());
664 continue;
665 };
666
667 expr = Some(current_expr.or(Expr::col(col).eq(value).and(prev_expr.clone())));
668 }
669
670 self.and_where(expr.unwrap());
671
672 Ok(())
673 }
674
675 fn build_reader_order<O: Bind>(&mut self) {
676 let order = if self.is_order_desc() {
677 sea_query::Order::Desc
678 } else {
679 sea_query::Order::Asc
680 };
681
682 let colums = O::columns();
683 for col in colums {
684 self.order_by(col, order.clone());
685 }
686 }
687
688 fn is_order_desc(&self) -> bool {
689 matches!(
690 (&self.order, self.args.is_backward()),
691 (cursor::Order::Asc, true) | (cursor::Order::Desc, false)
692 )
693 }
694}
695
696impl Deref for Reader {
697 type Target = SelectStatement;
698
699 fn deref(&self) -> &Self::Target {
700 &self.statement
701 }
702}
703
704impl DerefMut for Reader {
705 fn deref_mut(&mut self) -> &mut Self::Target {
706 &mut self.statement
707 }
708}
709
710pub trait Bind {
728 type T: IntoColumnRef + Clone;
730 type I: IntoIterator<Item = Self::T>;
732 type V: IntoIterator<Item = Expr>;
734 type Cursor: Cursor;
736
737 fn columns() -> Self::I;
739 fn values(cursor: <<Self as Bind>::Cursor as Cursor>::T) -> Self::V;
741}
742
743impl Bind for evento_core::Event {
744 type T = Event;
745 type I = [Self::T; 4];
746 type V = [Expr; 4];
747 type Cursor = Self;
748
749 fn columns() -> Self::I {
750 [
751 Event::Timestamp,
752 Event::TimestampSubsec,
753 Event::Version,
754 Event::Id,
755 ]
756 }
757
758 fn values(cursor: <<Self as Bind>::Cursor as Cursor>::T) -> Self::V {
759 [
760 cursor.t.into(),
761 cursor.s.into(),
762 cursor.v.into(),
763 cursor.i.into(),
764 ]
765 }
766}
767
768#[cfg(feature = "sqlite")]
769impl From<Sqlite> for evento_core::Evento {
770 fn from(value: Sqlite) -> Self {
771 evento_core::Evento::new(value)
772 }
773}
774
775#[cfg(feature = "sqlite")]
776impl From<&Sqlite> for evento_core::Evento {
777 fn from(value: &Sqlite) -> Self {
778 evento_core::Evento::new(value.clone())
779 }
780}
781
782#[cfg(feature = "mysql")]
783impl From<MySql> for evento_core::Evento {
784 fn from(value: MySql) -> Self {
785 evento_core::Evento::new(value)
786 }
787}
788
789#[cfg(feature = "mysql")]
790impl From<&MySql> for evento_core::Evento {
791 fn from(value: &MySql) -> Self {
792 evento_core::Evento::new(value.clone())
793 }
794}
795
796#[cfg(feature = "postgres")]
797impl From<Postgres> for evento_core::Evento {
798 fn from(value: Postgres) -> Self {
799 evento_core::Evento::new(value)
800 }
801}
802
803#[cfg(feature = "postgres")]
804impl From<&Postgres> for evento_core::Evento {
805 fn from(value: &Postgres) -> Self {
806 evento_core::Evento::new(value.clone())
807 }
808}