Skip to main content

imessage_database/tables/
table.rs

1/*!
2 Table traits, database connection helpers, and shared table constants.
3
4 # Streaming API
5
6 The streaming API processes each row through a callback without collecting the
7 table into a `Vec`.
8
9 ```no_run
10 use imessage_database::{
11    error::table::TableError,
12    tables::{
13        table::{get_connection, Table},
14        messages::Message,
15    },
16    util::dirs::default_db_path
17 };
18
19 let db_path = default_db_path();
20 let db = get_connection(&db_path).unwrap();
21
22 Message::stream(&db, |message_result| {
23     match message_result {
24         Ok(message) => println!("Message: {:#?}", message),
25         Err(e) => eprintln!("Error: {:?}", e),
26     }
27    Ok::<(), TableError>(())
28 }).unwrap();
29 ```
30
31 The callback may return any error type that implements `From<TableError>`.
32*/
33
34use std::{collections::HashMap, fs::metadata, path::Path};
35
36use rusqlite::{
37    CachedStatement, Connection, Error, OpenFlags, Params, Result, Row, Statement, blob::Blob,
38};
39
40use crate::error::table::{TableConnectError, TableError};
41
42// MARK: Traits
43/// Database table model that can deserialize itself from SQLite rows.
44pub trait Table: Sized {
45    /// Deserialize a single row into `Self`. Returns [`rusqlite::Result`]
46    /// for direct use inside `rusqlite::query_map` / `query_row`
47    /// callbacks. For high-level iteration, prefer [`Table::rows`] or
48    /// [`Table::row`].
49    fn from_row(row: &Row) -> Result<Self>;
50
51    /// Prepare the table's default `SELECT *` statement.
52    fn get(db: &'_ Connection) -> Result<CachedStatement<'_>, TableError>;
53
54    /// Iterate over rows produced by `stmt`, deserializing each via
55    /// [`from_row`](Self::from_row). Errors at row-fetch or row-deserialize
56    /// time are surfaced uniformly as [`TableError`]. Accepts both
57    /// [`rusqlite::Statement`] and [`rusqlite::CachedStatement`] (the
58    /// latter via deref coercion).
59    ///
60    /// Use this when the caller owns a custom prepared statement (with
61    /// filters, joins, or bound parameters). For a full-table scan against
62    /// the default `SELECT *` with a callback API, see [`Table::stream`].
63    fn rows<'stmt, P: Params>(
64        stmt: &'stmt mut Statement<'_>,
65        params: P,
66    ) -> Result<impl Iterator<Item = Result<Self, TableError>> + 'stmt, TableError>
67    where
68        Self: 'stmt,
69    {
70        let mapped = stmt.query_map(params, |row| Ok(Self::from_row(row)))?;
71        Ok(mapped.map(flatten_row))
72    }
73
74    /// Fetch exactly one row from `stmt`. Returns
75    /// [`TableError::QueryError`] if the row is missing or fails to
76    /// deserialize. Accepts both [`rusqlite::Statement`] and
77    /// [`rusqlite::CachedStatement`] (the latter via deref coercion).
78    fn row<P: Params>(stmt: &mut Statement<'_>, params: P) -> Result<Self, TableError> {
79        flatten_row(stmt.query_row(params, |row| Ok(Self::from_row(row))))
80    }
81
82    /// Process every row from the table's default `SELECT *` query using a
83    /// callback. Builds and discards the prepared statement internally, so
84    /// the caller never sees it.
85    ///
86    /// Use this for full-table scans where the callback style fits. For
87    /// custom statements (filters, joins, bound parameters), prepare the
88    /// statement yourself and iterate via [`Table::rows`]. See the
89    /// [`message`](crate::tables::messages::message) module docs for an
90    /// example.
91    ///
92    /// # Example
93    ///
94    /// ```no_run
95    /// use imessage_database::{
96    ///    error::table::TableError,
97    ///    tables::{
98    ///        table::{get_connection, Table},
99    ///        handle::Handle,
100    ///    },
101    ///    util::dirs::default_db_path
102    /// };
103    ///
104    /// let db_path = default_db_path();
105    /// let db = get_connection(&db_path).unwrap();
106    ///
107    /// // Stream the Handle table, processing each row with a callback
108    /// Handle::stream(&db, |handle_result| {
109    ///     match handle_result {
110    ///         Ok(handle) => println!("Handle: {}", handle.id),
111    ///         Err(e) => eprintln!("Error: {:?}", e),
112    ///     }
113    ///     Ok::<(), TableError>(())
114    /// }).unwrap();
115    /// ```
116    fn stream<F, E>(db: &Connection, callback: F) -> Result<(), E>
117    where
118        E: From<TableError>,
119        F: FnMut(Result<Self, TableError>) -> Result<(), E>,
120    {
121        stream_table_callback::<Self, F, E>(db, callback)
122    }
123
124    /// Open a `BLOB` column for the supplied `rowid`.
125    fn get_blob<'a>(
126        &self,
127        db: &'a Connection,
128        table: &str,
129        column: &str,
130        rowid: i64,
131    ) -> Option<Blob<'a>> {
132        db.blob_open(rusqlite::MAIN_DB, table, column, rowid, true)
133            .ok()
134    }
135
136    /// Return whether a `BLOB` column is non-null for the supplied `rowid`.
137    fn has_blob(&self, db: &Connection, table: &str, column: &str, rowid: i64) -> bool {
138        let sql = std::format!(
139            "SELECT ({column} IS NOT NULL) AS not_null
140         FROM {table}
141         WHERE rowid = ?1",
142        );
143
144        // This returns 1 for true, 0 for false.
145        db.query_row(&sql, [rowid], |row| row.get(0))
146            .ok()
147            .is_some_and(|v: i32| v != 0)
148    }
149}
150
151/// Flatten the doubly-nested result produced by `rusqlite::query_map` /
152/// `query_row` callbacks into a single [`TableError`]. The outer layer
153/// represents row-fetch failures, the inner layer represents row-deserialize
154/// failures from [`Table::from_row`].
155fn flatten_row<T>(item: Result<Result<T, Error>, Error>) -> Result<T, TableError> {
156    match item {
157        Ok(Ok(row)) => Ok(row),
158        Err(why) | Ok(Err(why)) => Err(TableError::QueryError(why)),
159    }
160}
161
162fn stream_table_callback<T, F, E>(db: &Connection, mut callback: F) -> Result<(), E>
163where
164    T: Table + Sized,
165    E: From<TableError>,
166    F: FnMut(Result<T, TableError>) -> Result<(), E>,
167{
168    let mut stmt = T::get(db).map_err(E::from)?;
169    for row_result in T::rows(&mut stmt, []).map_err(E::from)? {
170        callback(row_result)?;
171    }
172    Ok(())
173}
174
175/// Table data that can be materialized into an in-memory map.
176pub trait Cacheable {
177    /// Key type for the cache map.
178    type K;
179    /// Value type for the cache map.
180    type V;
181    /// Build the cache from the database.
182    fn cache(db: &Connection) -> Result<HashMap<Self::K, Self::V>, TableError>;
183}
184
185// MARK: Database
186/// Open the Messages `SQLite` database read-only.
187/// # Example:
188///
189/// ```
190/// use imessage_database::{
191///     util::dirs::default_db_path,
192///     tables::table::get_connection
193/// };
194///
195/// let db_path = default_db_path();
196/// let connection = get_connection(&db_path);
197/// ```
198pub fn get_connection(path: &Path) -> Result<Connection, TableError> {
199    if path.exists() && path.is_file() {
200        return match Connection::open_with_flags(
201            path,
202            OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
203        ) {
204            Ok(connection) => {
205                // Read pages from the mapped region where SQLite supports it.
206                let _ = connection.pragma_update(None, "mmap_size", 8_589_934_592_i64); // up to 8 GiB
207                let _ = connection.pragma_update(None, "cache_size", -65_536_i64); // ~64 MiB
208                Ok(connection)
209            }
210            Err(why) => Err(TableError::CannotConnect(TableConnectError::Permissions(
211                why,
212            ))),
213        };
214    }
215
216    // Path does not point to a file
217    if path.exists() && !path.is_file() {
218        return Err(TableError::CannotConnect(TableConnectError::NotAFile(
219            path.to_path_buf(),
220        )));
221    }
222
223    // File is missing
224    Err(TableError::CannotConnect(TableConnectError::DoesNotExist(
225        path.to_path_buf(),
226    )))
227}
228
229/// Return the database file size on disk.
230/// # Example:
231///
232/// ```
233/// use imessage_database::{
234///     util::dirs::default_db_path,
235///     tables::table::get_db_size
236/// };
237///
238/// let db_path = default_db_path();
239/// let database_size_in_bytes = get_db_size(&db_path);
240/// ```
241pub fn get_db_size(path: &Path) -> Result<u64, TableError> {
242    Ok(metadata(path)?.len())
243}
244
245// MARK: Constants
246// Table Names
247/// Handle table name.
248pub const HANDLE: &str = "handle";
249/// Message table name.
250pub const MESSAGE: &str = "message";
251/// Chat table name.
252pub const CHAT: &str = "chat";
253/// Attachment table name.
254pub const ATTACHMENT: &str = "attachment";
255/// Chat-to-message join table name.
256pub const CHAT_MESSAGE_JOIN: &str = "chat_message_join";
257/// Message-to-attachment join table name.
258pub const MESSAGE_ATTACHMENT_JOIN: &str = "message_attachment_join";
259/// Chat-to-handle join table name.
260pub const CHAT_HANDLE_JOIN: &str = "chat_handle_join";
261/// Recently deleted messages table.
262pub const RECENTLY_DELETED: &str = "chat_recoverable_message_join";
263
264// Column names
265/// [`plist`](crate::util::plist)-encoded app-message payload column.
266pub const MESSAGE_PAYLOAD: &str = "payload_data";
267/// [`plist`](crate::util::plist)-encoded message summary column.
268pub const MESSAGE_SUMMARY_INFO: &str = "message_summary_info";
269/// [`typedstream`](crate::util::typedstream)-encoded attributed body column.
270pub const ATTRIBUTED_BODY: &str = "attributedBody";
271/// [`plist`](crate::util::plist)-encoded sticker metadata column.
272pub const STICKER_USER_INFO: &str = "sticker_user_info";
273/// [`plist`](crate::util::plist)-encoded attachment attribution column.
274pub const ATTRIBUTION_INFO: &str = "attribution_info";
275/// [`plist`](crate::util::plist)-encoded chat properties column.
276pub const PROPERTIES: &str = "properties";
277
278// Default information
279/// First-person display name for the database owner.
280pub const ME: &str = "Me";
281/// Second-person display name for the database owner.
282pub const YOU: &str = "You";
283/// Display name used when a contact or chat name is unavailable.
284pub const UNKNOWN: &str = "Unknown";
285/// Default macOS Messages database path.
286pub const DEFAULT_PATH_MACOS: &str = "Library/Messages/chat.db";
287/// Default Messages database path inside an iOS backup.
288pub const DEFAULT_PATH_IOS: &str = "3d/3d0d7e5fb2ce288813306e4d4636395e047a3d28";
289/// Chat name reserved for messages that do not belong to a chat row.
290pub const ORPHANED: &str = "orphaned";
291/// Replacement token found in Fitness.app messages.
292pub const FITNESS_RECEIVER: &str = "$(kIMTranscriptPluginBreadcrumbTextReceiverIdentifier)";
293/// Attachments directory name used in exports.
294pub const ATTACHMENTS_DIR: &str = "attachments";
295
296#[cfg(test)]
297mod tests {
298    use rusqlite::{CachedStatement, Connection, Result, Row};
299
300    use crate::error::table::TableError;
301
302    use super::Table;
303
304    struct TestRow(i64);
305
306    impl Table for TestRow {
307        fn from_row(row: &Row) -> Result<Self> {
308            Ok(Self(row.get(0)?))
309        }
310
311        fn get(db: &'_ Connection) -> Result<CachedStatement<'_>, TableError> {
312            Ok(db.prepare_cached("SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3")?)
313        }
314    }
315
316    #[derive(Debug)]
317    enum StreamError {
318        Table(TableError),
319        Stop,
320    }
321
322    impl From<TableError> for StreamError {
323        fn from(err: TableError) -> Self {
324            Self::Table(err)
325        }
326    }
327
328    #[test]
329    fn stream_propagates_callback_errors() {
330        let db = Connection::open_in_memory().unwrap();
331        let mut seen = vec![];
332
333        let result = TestRow::stream(&db, |row| {
334            let row = row.map_err(StreamError::from)?;
335            seen.push(row.0);
336            if row.0 == 2 {
337                return Err(StreamError::Stop);
338            }
339            Ok(())
340        });
341
342        assert!(matches!(result, Err(StreamError::Stop)));
343        assert_eq!(seen, vec![1, 2]);
344    }
345
346    #[test]
347    fn stream_converts_setup_errors() {
348        struct BrokenTable;
349
350        impl Table for BrokenTable {
351            fn from_row(_row: &Row) -> Result<Self> {
352                Ok(Self)
353            }
354
355            fn get(_db: &'_ Connection) -> Result<CachedStatement<'_>, TableError> {
356                Err(TableError::CannotRead(std::io::Error::other("boom")))
357            }
358        }
359
360        let db = Connection::open_in_memory().unwrap();
361        let result = BrokenTable::stream(&db, |_| Ok::<(), StreamError>(()));
362
363        assert!(matches!(
364            result,
365            Err(StreamError::Table(TableError::CannotRead(_)))
366        ));
367    }
368}