apalis_sqlite/
callback.rs

1use futures::channel::mpsc::{self, UnboundedReceiver};
2use futures::{Stream, StreamExt};
3use std::ffi::{CStr, c_void};
4use std::os::raw::{c_char, c_int};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8#[derive(Debug)]
9pub struct DbEvent {
10    op: &'static str,
11    db_name: String,
12    table_name: String,
13    rowid: i64,
14}
15
16impl DbEvent {
17    pub fn operation(&self) -> &'static str {
18        self.op
19    }
20
21    pub fn db_name(&self) -> &str {
22        &self.db_name
23    }
24
25    pub fn table_name(&self) -> &str {
26        &self.table_name
27    }
28
29    pub fn rowid(&self) -> i64 {
30        self.rowid
31    }
32}
33
34// Callback for SQLite update hook
35pub(crate) extern "C" fn update_hook_callback(
36    arg: *mut c_void,
37    op: c_int,
38    db_name: *const c_char,
39    table_name: *const c_char,
40    rowid: i64,
41) {
42    let op_str = match op {
43        libsqlite3_sys::SQLITE_INSERT => "INSERT",
44        libsqlite3_sys::SQLITE_UPDATE => "UPDATE",
45        libsqlite3_sys::SQLITE_DELETE => "DELETE",
46        _ => "UNKNOWN",
47    };
48
49    unsafe {
50        let db = CStr::from_ptr(db_name).to_string_lossy().to_string();
51        let table = CStr::from_ptr(table_name).to_string_lossy().to_string();
52
53        // Recover sender from raw pointer
54        let tx = &mut *(arg as *mut mpsc::UnboundedSender<DbEvent>);
55
56        // Ignore send errors (receiver closed)
57        let _ = tx.start_send(DbEvent {
58            op: op_str,
59            db_name: db,
60            table_name: table,
61            rowid,
62        });
63    }
64}
65
66#[derive(Debug, Clone)]
67pub struct HookCallbackListener;
68
69#[derive(Debug)]
70pub struct CallbackListener {
71    rx: UnboundedReceiver<DbEvent>,
72}
73impl CallbackListener {
74    pub fn new(rx: UnboundedReceiver<DbEvent>) -> Self {
75        Self { rx }
76    }
77}
78
79impl Stream for CallbackListener {
80    type Item = DbEvent;
81
82    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
83        self.rx.poll_next_unpin(cx)
84    }
85}