evento_sql/
sql.rs

1//! Core SQL implementation for event sourcing.
2
3use std::ops::{Deref, DerefMut};
4
5#[cfg(feature = "mysql")]
6use sea_query::MysqlQueryBuilder;
7#[cfg(feature = "postgres")]
8use sea_query::PostgresQueryBuilder;
9#[cfg(feature = "sqlite")]
10use sea_query::SqliteQueryBuilder;
11use sea_query::{Cond, Expr, ExprTrait, Iden, IntoColumnRef, OnConflict, Query, SelectStatement};
12use sea_query_sqlx::SqlxBinder;
13use sqlx::{Database, Pool};
14use ulid::Ulid;
15
16use evento_core::{
17    cursor::{self, Args, Cursor, Edge, PageInfo, ReadResult, Value},
18    Executor, ReadAggregator, WriteError,
19};
20
21/// Column identifiers for the `event` table.
22///
23/// Used with sea-query for type-safe SQL query construction.
24///
25/// # Columns
26///
27/// - `Id` - Event identifier (ULID format, VARCHAR(26))
28/// - `Name` - Event type name (VARCHAR(50))
29/// - `AggregatorType` - Aggregate root type (VARCHAR(50))
30/// - `AggregatorId` - Aggregate root instance ID (VARCHAR(26))
31/// - `Version` - Event sequence number within the aggregate
32/// - `Data` - Serialized event payload (BLOB, bitcode format)
33/// - `Metadata` - Serialized event metadata (BLOB, bitcode format)
34/// - `RoutingKey` - Optional routing key for partitioning (VARCHAR(50))
35/// - `Timestamp` - Event timestamp in seconds (BIGINT)
36/// - `TimestampSubsec` - Sub-second precision (BIGINT)
37#[derive(Iden, Clone)]
38pub enum Event {
39    /// The table name: `event`
40    Table,
41    /// Event ID column (ULID)
42    Id,
43    /// Event type name
44    Name,
45    /// Aggregate root type
46    AggregatorType,
47    /// Aggregate root instance ID
48    AggregatorId,
49    /// Event version/sequence number
50    Version,
51    /// Serialized event data
52    Data,
53    /// Serialized event metadata
54    Metadata,
55    /// Optional routing key
56    RoutingKey,
57    /// Timestamp in seconds
58    Timestamp,
59    /// Sub-second precision
60    TimestampSubsec,
61}
62
63/// Column identifiers for the `snapshot` table.
64///
65/// Used with sea-query for type-safe SQL query construction.
66///
67/// **Note:** The snapshot table is dropped in migration M0003 and is no longer used.
68#[derive(Iden)]
69pub enum Snapshot {
70    /// The table name: `snapshot`
71    Table,
72    /// Snapshot ID
73    Id,
74    /// Snapshot type
75    Type,
76    /// Event stream cursor position
77    Cursor,
78    /// Revision identifier
79    Revision,
80    /// Serialized snapshot data
81    Data,
82    /// Creation timestamp
83    CreatedAt,
84    /// Last update timestamp
85    UpdatedAt,
86}
87
88/// Column identifiers for the `subscriber` table.
89///
90/// Used with sea-query for type-safe SQL query construction.
91///
92/// # Columns
93///
94/// - `Key` - Subscriber identifier (primary key)
95/// - `WorkerId` - ULID of the current worker processing events
96/// - `Cursor` - Current position in the event stream
97/// - `Lag` - Number of events behind the latest
98/// - `Enabled` - Whether the subscription is active
99/// - `CreatedAt` / `UpdatedAt` - Timestamps
100#[derive(Iden)]
101pub enum Subscriber {
102    /// The table name: `subscriber`
103    Table,
104    /// Subscriber key (primary key)
105    Key,
106    /// Current worker ID (ULID)
107    WorkerId,
108    /// Current cursor position
109    Cursor,
110    /// Event lag counter
111    Lag,
112    /// Whether subscription is enabled
113    Enabled,
114    /// Creation timestamp
115    CreatedAt,
116    /// Last update timestamp
117    UpdatedAt,
118}
119
120/// Type alias for MySQL executor.
121///
122/// Equivalent to `Sql<sqlx::MySql>`.
123#[cfg(feature = "mysql")]
124pub type MySql = Sql<sqlx::MySql>;
125
126/// Read-write executor pair for MySQL.
127///
128/// Used in CQRS patterns where you may have separate read and write connections.
129#[cfg(feature = "mysql")]
130pub type RwMySql = evento_core::Rw<MySql, MySql>;
131
132/// Type alias for PostgreSQL executor.
133///
134/// Equivalent to `Sql<sqlx::Postgres>`.
135#[cfg(feature = "postgres")]
136pub type Postgres = Sql<sqlx::Postgres>;
137
138/// Read-write executor pair for PostgreSQL.
139///
140/// Used in CQRS patterns where you may have separate read and write connections.
141#[cfg(feature = "postgres")]
142pub type RwPostgres = evento_core::Rw<Postgres, Postgres>;
143
144/// Type alias for SQLite executor.
145///
146/// Equivalent to `Sql<sqlx::Sqlite>`.
147#[cfg(feature = "sqlite")]
148pub type Sqlite = Sql<sqlx::Sqlite>;
149
150/// Read-write executor pair for SQLite.
151///
152/// Used in CQRS patterns where you may have separate read and write connections.
153#[cfg(feature = "sqlite")]
154pub type RwSqlite = evento_core::Rw<Sqlite, Sqlite>;
155
156/// SQL database executor for event sourcing operations.
157///
158/// A generic wrapper around a SQLx connection pool that implements the
159/// [`Executor`](evento_core::Executor) trait for storing and querying events.
160///
161/// # Type Parameters
162///
163/// - `DB` - The SQLx database type (e.g., `sqlx::Sqlite`, `sqlx::MySql`, `sqlx::Postgres`)
164///
165/// # Example
166///
167/// ```rust,ignore
168/// use evento_sql::Sql;
169/// use sqlx::sqlite::SqlitePoolOptions;
170///
171/// // Create a connection pool
172/// let pool = SqlitePoolOptions::new()
173///     .connect(":memory:")
174///     .await?;
175///
176/// // Convert to Sql executor
177/// let executor: Sql<sqlx::Sqlite> = pool.into();
178///
179/// // Or use the type alias
180/// let executor: evento_sql::Sqlite = pool.into();
181/// ```
182///
183/// # Executor Implementation
184///
185/// The `Sql` type implements [`Executor`](evento_core::Executor) with the following operations:
186///
187/// - **`read`** - Query events with filtering and cursor-based pagination
188/// - **`write`** - Persist events with optimistic concurrency control
189/// - **`get_subscriber_cursor`** - Get the current cursor position for a subscriber
190/// - **`is_subscriber_running`** - Check if a subscriber is active with a specific worker
191/// - **`upsert_subscriber`** - Create or update a subscriber record
192/// - **`acknowledge`** - Update subscriber cursor after processing events
193pub struct Sql<DB: Database>(Pool<DB>);
194
195impl<DB: Database> Sql<DB> {
196    fn build_sqlx<S: SqlxBinder>(statement: S) -> (String, sea_query_sqlx::SqlxValues) {
197        match DB::NAME {
198            #[cfg(feature = "sqlite")]
199            "SQLite" => statement.build_sqlx(SqliteQueryBuilder),
200            #[cfg(feature = "mysql")]
201            "MySQL" => statement.build_sqlx(MysqlQueryBuilder),
202            #[cfg(feature = "postgres")]
203            "PostgreSQL" => statement.build_sqlx(PostgresQueryBuilder),
204            name => panic!("'{name}' not supported, consider using SQLite, PostgreSQL or MySQL"),
205        }
206    }
207}
208
209#[async_trait::async_trait]
210impl<DB> Executor for Sql<DB>
211where
212    DB: Database,
213    for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
214    sea_query_sqlx::SqlxValues: for<'q> sqlx::IntoArguments<'q, DB>,
215    String: for<'r> sqlx::Decode<'r, DB> + sqlx::Type<DB>,
216    bool: for<'r> sqlx::Decode<'r, DB> + sqlx::Type<DB>,
217    Vec<u8>: for<'r> sqlx::Decode<'r, DB> + sqlx::Type<DB>,
218    usize: sqlx::ColumnIndex<DB::Row>,
219    evento_core::Event: for<'r> sqlx::FromRow<'r, DB::Row>,
220{
221    async fn read(
222        &self,
223        aggregators: Option<Vec<ReadAggregator>>,
224        routing_key: Option<evento_core::RoutingKey>,
225        args: Args,
226    ) -> anyhow::Result<ReadResult<evento_core::Event>> {
227        let statement = Query::select()
228            .columns([
229                Event::Id,
230                Event::Name,
231                Event::AggregatorType,
232                Event::AggregatorId,
233                Event::Version,
234                Event::Data,
235                Event::Metadata,
236                Event::RoutingKey,
237                Event::Timestamp,
238                Event::TimestampSubsec,
239            ])
240            .from(Event::Table)
241            .conditions(
242                aggregators.is_some(),
243                |q| {
244                    let Some(aggregators) = aggregators else {
245                        return;
246                    };
247
248                    let mut cond = Cond::any();
249
250                    for aggregator in aggregators {
251                        let mut aggregator_cond = Cond::all()
252                            .add(Expr::col(Event::AggregatorType).eq(aggregator.aggregator_type));
253
254                        if let Some(id) = aggregator.aggregator_id {
255                            aggregator_cond =
256                                aggregator_cond.add(Expr::col(Event::AggregatorId).eq(id));
257                        }
258
259                        if let Some(name) = aggregator.name {
260                            aggregator_cond = aggregator_cond.add(Expr::col(Event::Name).eq(name));
261                        }
262
263                        cond = cond.add(aggregator_cond);
264                    }
265
266                    q.and_where(cond.into());
267                },
268                |_| {},
269            )
270            .conditions(
271                matches!(routing_key, Some(evento_core::RoutingKey::Value(_))),
272                |q| {
273                    if let Some(evento_core::RoutingKey::Value(Some(ref routing_key))) = routing_key
274                    {
275                        q.and_where(Expr::col(Event::RoutingKey).eq(routing_key));
276                    }
277
278                    if let Some(evento_core::RoutingKey::Value(None)) = routing_key {
279                        q.and_where(Expr::col(Event::RoutingKey).is_null());
280                    }
281                },
282                |_q| {},
283            )
284            .to_owned();
285
286        Ok(Reader::new(statement)
287            .args(args)
288            .execute::<_, evento_core::Event, _>(&self.0)
289            .await?)
290    }
291
292    async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>> {
293        let statement = Query::select()
294            .columns([Subscriber::Cursor])
295            .from(Subscriber::Table)
296            .and_where(Expr::col(Subscriber::Key).eq(Expr::value(key)))
297            .limit(1)
298            .to_owned();
299
300        let (sql, values) = Self::build_sqlx(statement);
301
302        let Some((cursor,)) = sqlx::query_as_with::<DB, (Option<String>,), _>(&sql, values)
303            .fetch_optional(&self.0)
304            .await?
305        else {
306            return Ok(None);
307        };
308
309        Ok(cursor.map(|c| c.into()))
310    }
311
312    async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool> {
313        let statement = Query::select()
314            .columns([Subscriber::WorkerId, Subscriber::Enabled])
315            .from(Subscriber::Table)
316            .and_where(Expr::col(Subscriber::Key).eq(Expr::value(key)))
317            .limit(1)
318            .to_owned();
319
320        let (sql, values) = Self::build_sqlx(statement);
321
322        let (id, enabled) = sqlx::query_as_with::<DB, (String, bool), _>(&sql, values)
323            .fetch_one(&self.0)
324            .await?;
325
326        Ok(worker_id.to_string() == id && enabled)
327    }
328
329    async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()> {
330        let statement = Query::insert()
331            .into_table(Subscriber::Table)
332            .columns([Subscriber::Key, Subscriber::WorkerId, Subscriber::Lag])
333            .values_panic([key.into(), worker_id.to_string().into(), 0.into()])
334            .on_conflict(
335                OnConflict::column(Subscriber::Key)
336                    .update_columns([Subscriber::WorkerId])
337                    .value(Subscriber::UpdatedAt, Expr::current_timestamp())
338                    .to_owned(),
339            )
340            .to_owned();
341
342        let (sql, values) = Self::build_sqlx(statement);
343
344        sqlx::query_with::<DB, _>(&sql, values)
345            .execute(&self.0)
346            .await?;
347
348        Ok(())
349    }
350
351    async fn write(&self, events: Vec<evento_core::Event>) -> Result<(), WriteError> {
352        let mut statement = Query::insert()
353            .into_table(Event::Table)
354            .columns([
355                Event::Id,
356                Event::Name,
357                Event::Data,
358                Event::Metadata,
359                Event::AggregatorType,
360                Event::AggregatorId,
361                Event::Version,
362                Event::RoutingKey,
363                Event::Timestamp,
364                Event::TimestampSubsec,
365            ])
366            .to_owned();
367
368        for event in events {
369            statement.values_panic([
370                event.id.to_string().into(),
371                event.name.into(),
372                event.data.into(),
373                event.metadata.into(),
374                event.aggregator_type.into(),
375                event.aggregator_id.into(),
376                event.version.into(),
377                event.routing_key.into(),
378                event.timestamp.into(),
379                event.timestamp_subsec.into(),
380            ]);
381        }
382
383        let (sql, values) = Self::build_sqlx(statement);
384
385        sqlx::query_with::<DB, _>(&sql, values)
386            .execute(&self.0)
387            .await
388            .map_err(|err| {
389                let err_str = err.to_string();
390                if err_str.contains("(code: 2067)") {
391                    return WriteError::InvalidOriginalVersion;
392                }
393                if err_str.contains("1062 (23000): Duplicate entry") {
394                    return WriteError::InvalidOriginalVersion;
395                }
396                if err_str.contains("duplicate key value violates unique constraint") {
397                    return WriteError::InvalidOriginalVersion;
398                }
399                WriteError::Unknown(err.into())
400            })?;
401
402        Ok(())
403    }
404
405    async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()> {
406        let statement = Query::update()
407            .table(Subscriber::Table)
408            .values([
409                (Subscriber::Cursor, cursor.0.into()),
410                (Subscriber::Lag, lag.into()),
411                (Subscriber::UpdatedAt, Expr::current_timestamp()),
412            ])
413            .and_where(Expr::col(Subscriber::Key).eq(key))
414            .to_owned();
415
416        let (sql, values) = Self::build_sqlx(statement);
417
418        sqlx::query_with::<DB, _>(&sql, values)
419            .execute(&self.0)
420            .await?;
421
422        Ok(())
423    }
424}
425
426impl<D: Database> Clone for Sql<D> {
427    fn clone(&self) -> Self {
428        Self(self.0.clone())
429    }
430}
431
432impl<D: Database> From<Pool<D>> for Sql<D> {
433    fn from(value: Pool<D>) -> Self {
434        Self(value)
435    }
436}
437
438/// Query builder for reading events with cursor-based pagination.
439///
440/// `Reader` wraps a sea-query [`SelectStatement`] and adds support for:
441/// - Forward pagination (first N after cursor)
442/// - Backward pagination (last N before cursor)
443/// - Ascending/descending order
444///
445/// # Example
446///
447/// ```rust,ignore
448/// use evento_sql::{Reader, Event};
449/// use sea_query::Query;
450///
451/// let statement = Query::select()
452///     .columns([Event::Id, Event::Name, Event::Data])
453///     .from(Event::Table)
454///     .to_owned();
455///
456/// let result = Reader::new(statement)
457///     .forward(10, None)  // First 10 events
458///     .execute::<_, MyEvent, _>(&pool)
459///     .await?;
460///
461/// for edge in result.edges {
462///     println!("Event: {:?}, Cursor: {:?}", edge.node, edge.cursor);
463/// }
464///
465/// // Continue with next page
466/// if result.page_info.has_next_page {
467///     let next_result = Reader::new(statement)
468///         .forward(10, result.page_info.end_cursor)
469///         .execute::<_, MyEvent, _>(&pool)
470///         .await?;
471/// }
472/// ```
473///
474/// # Deref
475///
476/// `Reader` implements `Deref` and `DerefMut` to the underlying `SelectStatement`,
477/// allowing direct access to sea-query builder methods.
478pub struct Reader {
479    statement: SelectStatement,
480    args: Args,
481    order: cursor::Order,
482}
483
484impl Reader {
485    /// Creates a new reader from a sea-query select statement.
486    pub fn new(statement: SelectStatement) -> Self {
487        Self {
488            statement,
489            args: Args::default(),
490            order: cursor::Order::Asc,
491        }
492    }
493
494    /// Sets the sort order for results.
495    pub fn order(&mut self, order: cursor::Order) -> &mut Self {
496        self.order = order;
497
498        self
499    }
500
501    /// Sets descending sort order.
502    pub fn desc(&mut self) -> &mut Self {
503        self.order(cursor::Order::Desc)
504    }
505
506    /// Sets pagination arguments directly.
507    pub fn args(&mut self, args: Args) -> &mut Self {
508        self.args = args;
509
510        self
511    }
512
513    /// Configures backward pagination (last N before cursor).
514    ///
515    /// # Arguments
516    ///
517    /// - `last` - Number of items to return
518    /// - `before` - Optional cursor to paginate before
519    pub fn backward(&mut self, last: u16, before: Option<Value>) -> &mut Self {
520        self.args(Args {
521            last: Some(last),
522            before,
523            ..Default::default()
524        })
525    }
526
527    /// Configures forward pagination (first N after cursor).
528    ///
529    /// # Arguments
530    ///
531    /// - `first` - Number of items to return
532    /// - `after` - Optional cursor to paginate after
533    pub fn forward(&mut self, first: u16, after: Option<Value>) -> &mut Self {
534        self.args(Args {
535            first: Some(first),
536            after,
537            ..Default::default()
538        })
539    }
540
541    /// Executes the query and returns paginated results.
542    ///
543    /// # Type Parameters
544    ///
545    /// - `DB` - The SQLx database type
546    /// - `O` - The output row type (must implement `FromRow`, `Cursor`, and `Bind`)
547    /// - `E` - The executor type
548    ///
549    /// # Returns
550    ///
551    /// A [`ReadResult`](evento_core::cursor::ReadResult) containing edges with nodes and cursors,
552    /// plus pagination info.
553    pub async fn execute<'e, 'c: 'e, DB, O, E>(
554        &mut self,
555        executor: E,
556    ) -> anyhow::Result<ReadResult<O>>
557    where
558        DB: Database,
559        E: 'e + sqlx::Executor<'c, Database = DB>,
560        O: for<'r> sqlx::FromRow<'r, DB::Row>,
561        O: Cursor,
562        O: Send + Unpin,
563        O: Bind<Cursor = O>,
564        <<O as Bind>::I as IntoIterator>::IntoIter: DoubleEndedIterator,
565        <<O as Bind>::V as IntoIterator>::IntoIter: DoubleEndedIterator,
566        sea_query_sqlx::SqlxValues: for<'q> sqlx::IntoArguments<'q, DB>,
567    {
568        let limit = self.build_reader::<O, O>()?;
569
570        let (sql, values) = match DB::NAME {
571            #[cfg(feature = "sqlite")]
572            "SQLite" => self.statement.build_sqlx(SqliteQueryBuilder),
573            #[cfg(feature = "mysql")]
574            "MySQL" => self.build_sqlx(MysqlQueryBuilder),
575            #[cfg(feature = "postgres")]
576            "PostgreSQL" => self.build_sqlx(PostgresQueryBuilder),
577            name => panic!("'{name}' not supported, consider using SQLite, PostgreSQL or MySQL"),
578        };
579
580        let mut rows = sqlx::query_as_with::<DB, O, _>(&sql, values)
581            .fetch_all(executor)
582            .await?;
583
584        let has_more = rows.len() > limit as usize;
585        if has_more {
586            rows.pop();
587        }
588
589        let mut edges = vec![];
590        for node in rows.into_iter() {
591            edges.push(Edge {
592                cursor: node.serialize_cursor()?,
593                node,
594            });
595        }
596
597        if self.args.is_backward() {
598            edges = edges.into_iter().rev().collect();
599        }
600
601        let page_info = if self.args.is_backward() {
602            let start_cursor = edges.first().map(|e| e.cursor.clone());
603
604            PageInfo {
605                has_previous_page: has_more,
606                has_next_page: false,
607                start_cursor,
608                end_cursor: None,
609            }
610        } else {
611            let end_cursor = edges.last().map(|e| e.cursor.clone());
612            PageInfo {
613                has_previous_page: false,
614                has_next_page: has_more,
615                start_cursor: None,
616                end_cursor,
617            }
618        };
619
620        Ok(ReadResult { edges, page_info })
621    }
622
623    fn build_reader<O: Cursor, B: Bind<Cursor = O>>(&mut self) -> Result<u16, cursor::CursorError>
624    where
625        B::T: Clone,
626        <<B as Bind>::I as IntoIterator>::IntoIter: DoubleEndedIterator,
627        <<B as Bind>::V as IntoIterator>::IntoIter: DoubleEndedIterator,
628    {
629        let (limit, cursor) = self.args.get_info();
630
631        if let Some(cursor) = cursor.as_ref() {
632            self.build_reader_where::<O, B>(cursor)?;
633        }
634
635        self.build_reader_order::<B>();
636        self.limit((limit + 1).into());
637
638        Ok(limit)
639    }
640
641    fn build_reader_where<O, B>(&mut self, cursor: &Value) -> Result<(), cursor::CursorError>
642    where
643        O: Cursor,
644        B: Bind<Cursor = O>,
645        B::T: Clone,
646        <<B as Bind>::I as IntoIterator>::IntoIter: DoubleEndedIterator,
647        <<B as Bind>::V as IntoIterator>::IntoIter: DoubleEndedIterator,
648    {
649        let is_order_desc = self.is_order_desc();
650        let cursor = O::deserialize_cursor(cursor)?;
651        let colums = B::columns().into_iter().rev();
652        let values = B::values(cursor).into_iter().rev();
653
654        let mut expr = None::<Expr>;
655        for (col, value) in colums.zip(values) {
656            let current_expr = if is_order_desc {
657                Expr::col(col.clone()).lt(value.clone())
658            } else {
659                Expr::col(col.clone()).gt(value.clone())
660            };
661
662            let Some(ref prev_expr) = expr else {
663                expr = Some(current_expr.clone());
664                continue;
665            };
666
667            expr = Some(current_expr.or(Expr::col(col).eq(value).and(prev_expr.clone())));
668        }
669
670        self.and_where(expr.unwrap());
671
672        Ok(())
673    }
674
675    fn build_reader_order<O: Bind>(&mut self) {
676        let order = if self.is_order_desc() {
677            sea_query::Order::Desc
678        } else {
679            sea_query::Order::Asc
680        };
681
682        let colums = O::columns();
683        for col in colums {
684            self.order_by(col, order.clone());
685        }
686    }
687
688    fn is_order_desc(&self) -> bool {
689        matches!(
690            (&self.order, self.args.is_backward()),
691            (cursor::Order::Asc, true) | (cursor::Order::Desc, false)
692        )
693    }
694}
695
696impl Deref for Reader {
697    type Target = SelectStatement;
698
699    fn deref(&self) -> &Self::Target {
700        &self.statement
701    }
702}
703
704impl DerefMut for Reader {
705    fn deref_mut(&mut self) -> &mut Self::Target {
706        &mut self.statement
707    }
708}
709
710/// Trait for binding cursor values in paginated queries.
711///
712/// This trait defines how to serialize cursor data for keyset pagination.
713/// It specifies which columns are used for ordering and how to extract
714/// their values from a cursor.
715///
716/// # Implementation
717///
718/// The trait is implemented for [`evento_core::Event`] to enable pagination
719/// over the event table using timestamp, version, and ID columns.
720///
721/// # Associated Types
722///
723/// - `T` - Column reference type
724/// - `I` - Iterator over column references
725/// - `V` - Iterator over value expressions
726/// - `Cursor` - The cursor type that provides pagination data
727pub trait Bind {
728    /// Column reference type (e.g., `Event` enum variant).
729    type T: IntoColumnRef + Clone;
730    /// Iterator type for columns.
731    type I: IntoIterator<Item = Self::T>;
732    /// Iterator type for values.
733    type V: IntoIterator<Item = Expr>;
734    /// The cursor type used for pagination.
735    type Cursor: Cursor;
736
737    /// Returns the columns used for cursor-based ordering.
738    fn columns() -> Self::I;
739    /// Extracts values from a cursor for WHERE clause construction.
740    fn values(cursor: <<Self as Bind>::Cursor as Cursor>::T) -> Self::V;
741}
742
743impl Bind for evento_core::Event {
744    type T = Event;
745    type I = [Self::T; 4];
746    type V = [Expr; 4];
747    type Cursor = Self;
748
749    fn columns() -> Self::I {
750        [
751            Event::Timestamp,
752            Event::TimestampSubsec,
753            Event::Version,
754            Event::Id,
755        ]
756    }
757
758    fn values(cursor: <<Self as Bind>::Cursor as Cursor>::T) -> Self::V {
759        [
760            cursor.t.into(),
761            cursor.s.into(),
762            cursor.v.into(),
763            cursor.i.into(),
764        ]
765    }
766}
767
768#[cfg(feature = "sqlite")]
769impl From<Sqlite> for evento_core::Evento {
770    fn from(value: Sqlite) -> Self {
771        evento_core::Evento::new(value)
772    }
773}
774
775#[cfg(feature = "sqlite")]
776impl From<&Sqlite> for evento_core::Evento {
777    fn from(value: &Sqlite) -> Self {
778        evento_core::Evento::new(value.clone())
779    }
780}
781
782#[cfg(feature = "mysql")]
783impl From<MySql> for evento_core::Evento {
784    fn from(value: MySql) -> Self {
785        evento_core::Evento::new(value)
786    }
787}
788
789#[cfg(feature = "mysql")]
790impl From<&MySql> for evento_core::Evento {
791    fn from(value: &MySql) -> Self {
792        evento_core::Evento::new(value.clone())
793    }
794}
795
796#[cfg(feature = "postgres")]
797impl From<Postgres> for evento_core::Evento {
798    fn from(value: Postgres) -> Self {
799        evento_core::Evento::new(value)
800    }
801}
802
803#[cfg(feature = "postgres")]
804impl From<&Postgres> for evento_core::Evento {
805    fn from(value: &Postgres) -> Self {
806        evento_core::Evento::new(value.clone())
807    }
808}