mod entity;
pub mod error;
mod repo;
use sqlx::PgPool;
#[cfg(feature = "import")]
use crate::primitives::DataSourceId;
use crate::{
ledger_operation::*,
outbox::*,
primitives::{AccountId, DataSource},
};
pub use entity::*;
use error::*;
pub use repo::entry_cursor::EntriesByCreatedAtCursor;
use repo::*;
#[derive(Clone)]
pub struct Entries {
repo: EntryRepo,
outbox: Outbox,
_pool: PgPool,
}
impl Entries {
pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
Self {
repo: EntryRepo::new(pool),
outbox,
_pool: pool.clone(),
}
}
pub async fn list_for_account_id(
&self,
account_id: AccountId,
query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
direction: es_entity::ListDirection,
) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
self.repo
.list_for_account_id_by_created_at(account_id, query, direction)
.await
}
pub(crate) async fn create_all_in_op(
&self,
db: &mut LedgerOperation<'_>,
entries: Vec<NewEntry>,
) -> Result<Vec<EntryValues>, EntryError> {
let entries = self.repo.create_all_in_op(db.op(), entries).await?;
db.accumulate(
entries
.iter()
.map(|entry| OutboxEventPayload::EntryCreated {
source: DataSource::Local,
entry: entry.values().clone(),
}),
);
Ok(entries
.into_iter()
.map(|entry| entry.into_values())
.collect())
}
#[cfg(feature = "import")]
pub(crate) async fn sync_entry_creation(
&self,
mut db: es_entity::DbOp<'_>,
origin: DataSourceId,
values: EntryValues,
) -> Result<(), EntryError> {
let mut entry = Entry::import(origin, values);
self.repo.import(&mut db, origin, &mut entry).await?;
let recorded_at = db.now();
let outbox_events: Vec<_> = entry
.events
.last_persisted(1)
.map(|p| OutboxEventPayload::from(&p.event))
.collect();
self.outbox
.persist_events_at(db.into_tx(), outbox_events, recorded_at)
.await?;
Ok(())
}
}
impl From<&EntryEvent> for OutboxEventPayload {
fn from(event: &EntryEvent) -> Self {
match event {
#[cfg(feature = "import")]
EntryEvent::Imported {
source,
values: entry,
} => OutboxEventPayload::EntryCreated {
source: *source,
entry: entry.clone(),
},
EntryEvent::Initialized { values: entry } => OutboxEventPayload::EntryCreated {
source: DataSource::Local,
entry: entry.clone(),
},
}
}
}