use evento::{
cursor::{Args, Order, ReadResult},
sql::Reader,
sql_migrator::{InitMigration, M0002, M0003},
Event,
};
use evento_test::assert_read_result;
use sea_query::{MysqlQueryBuilder, PostgresQueryBuilder, Query, SqliteQueryBuilder};
use sea_query_sqlx::SqlxBinder;
use sqlx::{any::install_default_drivers, migrate::MigrateDatabase, Any, Database, Pool};
use sqlx_migrator::{Migrate, Plan};
pub async fn forward_asc<DB>(pool: Pool<DB>) -> anyhow::Result<()>
where
DB: Database,
for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
sea_query_sqlx::SqlxValues: for<'q> sqlx::IntoArguments<'q, DB>,
usize: sqlx::ColumnIndex<DB::Row>,
evento_sql::SqlEvent: for<'r> sqlx::FromRow<'r, DB::Row>,
{
let data = get_data(&pool).await?;
let order = Order::Asc;
let args = Args::forward((data.len() + 1) as u16, None);
let result = read(&pool, args.clone(), order.to_owned()).await?;
assert_read_result(args, order.to_owned(), data.clone(), result)?;
let args = Args::forward(4, None);
let result = read(&pool, args.clone(), order.to_owned()).await?;
let end_cursor = result.page_info.end_cursor.clone();
assert_read_result(args, order.to_owned(), data.clone(), result)?;
let args = Args::forward(4, end_cursor);
let result = read(&pool, args.clone(), order.to_owned()).await?;
assert_read_result(args, order, data, result)?;
Ok(())
}
pub async fn forward_desc<DB>(pool: Pool<DB>) -> anyhow::Result<()>
where
DB: Database,
for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
sea_query_sqlx::SqlxValues: for<'q> sqlx::IntoArguments<'q, DB>,
usize: sqlx::ColumnIndex<DB::Row>,
evento_sql::SqlEvent: for<'r> sqlx::FromRow<'r, DB::Row>,
{
let data = get_data(&pool).await?;
let order = Order::Desc;
let args = Args::forward((data.len() + 1) as u16, None);
let result = read(&pool, args.clone(), order.to_owned()).await?;
assert_read_result(args, order.to_owned(), data.clone(), result)?;
let args = Args::forward(4, None);
let result = read(&pool, args.clone(), order.to_owned()).await?;
let end_cursor = result.page_info.end_cursor.clone();
assert_read_result(args, order.to_owned(), data.clone(), result)?;
let args = Args::forward(4, end_cursor);
let result = read(&pool, args.clone(), order.to_owned()).await?;
assert_read_result(args, order, data, result)?;
Ok(())
}
pub async fn backward_asc<DB>(pool: Pool<DB>) -> anyhow::Result<()>
where
DB: Database,
for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
sea_query_sqlx::SqlxValues: for<'q> sqlx::IntoArguments<'q, DB>,
usize: sqlx::ColumnIndex<DB::Row>,
evento_sql::SqlEvent: for<'r> sqlx::FromRow<'r, DB::Row>,
{
let data = get_data(&pool).await?;
let order = Order::Asc;
let args = Args::backward((data.len() + 1) as u16, None);
let result = read(&pool, args.clone(), order.to_owned()).await?;
assert_read_result(args, order.to_owned(), data.clone(), result)?;
let args = Args::backward(4, None);
let result = read(&pool, args.clone(), order.to_owned()).await?;
let start_cursor = result.page_info.start_cursor.clone();
assert_read_result(args, order.to_owned(), data.clone(), result)?;
let args = Args::backward(4, start_cursor);
let result = read(&pool, args.clone(), order.to_owned()).await?;
assert_read_result(args, order, data, result)?;
Ok(())
}
pub async fn backward_desc<DB>(pool: Pool<DB>) -> anyhow::Result<()>
where
DB: Database,
for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
sea_query_sqlx::SqlxValues: for<'q> sqlx::IntoArguments<'q, DB>,
usize: sqlx::ColumnIndex<DB::Row>,
evento_sql::SqlEvent: for<'r> sqlx::FromRow<'r, DB::Row>,
{
let data = get_data(&pool).await?;
let order = Order::Desc;
let args = Args::backward((data.len() + 1) as u16, None);
let result = read(&pool, args.clone(), order.to_owned()).await?;
assert_read_result(args, order.to_owned(), data.clone(), result)?;
let args = Args::backward(4, None);
let result = read(&pool, args.clone(), order.to_owned()).await?;
let start_cursor = result.page_info.start_cursor.clone();
assert_read_result(args, order.to_owned(), data.clone(), result)?;
let args = Args::backward(4, start_cursor);
let result = read(&pool, args.clone(), order.to_owned()).await?;
assert_read_result(args, order, data, result)?;
Ok(())
}
pub async fn read<DB>(
pool: &Pool<DB>,
args: Args,
order: Order,
) -> anyhow::Result<ReadResult<Event>>
where
DB: Database,
for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
sea_query_sqlx::SqlxValues: for<'q> sqlx::IntoArguments<'q, DB>,
usize: sqlx::ColumnIndex<DB::Row>,
evento_sql::SqlEvent: for<'r> sqlx::FromRow<'r, DB::Row>,
{
let statement = Query::select()
.columns([
evento::sql::Event::Id,
evento::sql::Event::Name,
evento::sql::Event::AggregatorType,
evento::sql::Event::AggregatorId,
evento::sql::Event::Version,
evento::sql::Event::Data,
evento::sql::Event::Metadata,
evento::sql::Event::RoutingKey,
evento::sql::Event::Timestamp,
evento::sql::Event::TimestampSubsec,
])
.from(evento::sql::Event::Table)
.to_owned();
Ok(Reader::new(statement)
.args(args)
.order(order)
.execute::<_, evento_sql::SqlEvent, _>(pool)
.await?
.map(|e| e.0))
}
pub async fn get_data<DB>(pool: &Pool<DB>) -> anyhow::Result<Vec<Event>>
where
DB: Database,
for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
sea_query_sqlx::SqlxValues: for<'q> sqlx::IntoArguments<'q, DB>,
{
let data = evento_test::get_data();
let mut statement = Query::insert()
.into_table(evento::sql::Event::Table)
.columns([
evento::sql::Event::Id,
evento::sql::Event::Name,
evento::sql::Event::Data,
evento::sql::Event::Metadata,
evento::sql::Event::AggregatorType,
evento::sql::Event::AggregatorId,
evento::sql::Event::Version,
evento::sql::Event::RoutingKey,
evento::sql::Event::Timestamp,
evento::sql::Event::TimestampSubsec,
])
.to_owned();
for event in data.clone() {
let metadata = bitcode::encode(&event.metadata);
statement.values_panic([
event.id.to_string().into(),
event.name.into(),
event.data.into(),
metadata.into(),
event.aggregator_type.into(),
event.aggregator_id.to_string().into(),
event.version.into(),
event.routing_key.into(),
event.timestamp.into(),
event.timestamp_subsec.into(),
]);
}
let (sql, values) = match DB::NAME {
"SQLite" => statement.build_sqlx(SqliteQueryBuilder),
"MySQL" => statement.build_sqlx(MysqlQueryBuilder),
"PostgreSQL" => statement.build_sqlx(PostgresQueryBuilder),
name => panic!("'{name}' not supported, consider using SQLite, PostgreSQL or MySQL"),
};
sqlx::query_with::<DB, _>(&sql, values)
.execute(pool)
.await?;
Ok(data)
}
pub async fn create_pool<DB: Database>(url: impl Into<String>) -> anyhow::Result<Pool<DB>>
where
for<'q> DB::Arguments<'q>: sqlx::IntoArguments<'q, DB>,
for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
InitMigration: sqlx_migrator::Migration<DB>,
M0002: sqlx_migrator::Migration<DB>,
M0003: sqlx_migrator::Migration<DB>,
sqlx_migrator::Migrator<DB>: sqlx_migrator::migrator::DatabaseOperation<DB>,
{
install_default_drivers();
let url = url.into();
let _ = Any::drop_database(&url).await;
Any::create_database(&url).await?;
let pool = Pool::<DB>::connect(&url).await?;
let mut conn = pool.acquire().await?;
let migrator = evento::sql_migrator::new::<DB>()?;
migrator.run(&mut *conn, &Plan::apply_all()).await?;
Ok(pool)
}