apalis_sqlite/
callback.rs

1use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
2use futures::{Stream, StreamExt};
3use sqlx::sqlite::{SqliteOperation, UpdateHookResult};
4use std::pin::Pin;
5use std::sync::{Arc, Mutex};
6use std::task::{Context, Poll};
7
8/// Database event emitted by SQLite update hook
9#[derive(Debug)]
10pub struct DbEvent {
11    op: SqliteOperation,
12    db_name: String,
13    table_name: String,
14    rowid: i64,
15}
16
17impl DbEvent {
18    /// Get the operation type of the database event
19    #[must_use]
20    pub fn operation(&self) -> &SqliteOperation {
21        &self.op
22    }
23
24    /// Get the database name of the database event
25    #[must_use]
26    pub fn db_name(&self) -> &str {
27        &self.db_name
28    }
29
30    /// Get the table name of the database event
31    #[must_use]
32    pub fn table_name(&self) -> &str {
33        &self.table_name
34    }
35
36    /// Get the rowid of the database event
37    #[must_use]
38    pub fn rowid(&self) -> i64 {
39        self.rowid
40    }
41}
42
43// Callback for SQLite update hook
44pub(crate) fn update_hook_callback(event: UpdateHookResult<'_>, tx: &mut UnboundedSender<DbEvent>) {
45    tx.start_send(DbEvent {
46        op: event.operation,
47        db_name: event.database.to_owned(),
48        table_name: event.table.to_owned(),
49        rowid: event.rowid,
50    })
51    .unwrap();
52}
53
54/// Listener for database events emitted by SQLite update hook
55#[derive(Debug, Clone)]
56pub struct HookCallbackListener {
57    rx: Arc<Mutex<UnboundedReceiver<DbEvent>>>,
58}
59
60impl HookCallbackListener {
61    /// Create a new HookCallbackListener
62    #[must_use]
63    pub fn new(rx: UnboundedReceiver<DbEvent>) -> Self {
64        Self {
65            rx: Arc::new(Mutex::new(rx)),
66        }
67    }
68}
69
70impl Stream for HookCallbackListener {
71    type Item = DbEvent;
72
73    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
74        self.rx.lock().unwrap().poll_next_unpin(cx)
75    }
76}