Skip to main content

sqlite_vtable_opendal/backends/
postgresql.rs

1//! PostgreSQL storage backend implementation
2//!
3//! This backend allows querying PostgreSQL database tables using SQL.
4//! It treats database records as "files" where the key column is the path
5//! and the value column is the content.
6
7use crate::backends::StorageBackend;
8use crate::error::{Result, VTableError};
9use crate::types::{FileMetadata, QueryConfig};
10use async_trait::async_trait;
11use opendal::{services::Postgresql, Operator};
12use tokio_postgres::NoTls;
13
14/// PostgreSQL storage backend
15///
16/// This backend uses OpenDAL's Postgresql service to query database tables.
17/// Each row is treated as a "file" where:
18/// - The key_field column becomes the file path
19/// - The value_field column becomes the file content
20///
21/// # Example
22///
23/// ```rust,ignore
24/// use sqlite_vtable_opendal::backends::postgresql::PostgresqlBackend;
25/// use sqlite_vtable_opendal::types::QueryConfig;
26///
27/// let backend = PostgresqlBackend::new(
28///     "postgresql://user:pass@localhost/db",
29///     "my_table",
30///     "id",
31///     "data"
32/// );
33/// let config = QueryConfig::default();
34/// let files = backend.list_files(&config).await?;
35/// ```
36pub struct PostgresqlBackend {
37    /// PostgreSQL connection string
38    connection_string: String,
39    /// Table name to query
40    table: String,
41    /// Column name for the key (becomes file path)
42    key_field: String,
43    /// Column name for the value (becomes file content)
44    value_field: String,
45}
46
47impl PostgresqlBackend {
48    /// Create a new PostgreSQL backend
49    ///
50    /// # Arguments
51    ///
52    /// * `connection_string` - PostgreSQL connection string
53    /// * `table` - Table name to query
54    /// * `key_field` - Column name for keys (default: "key")
55    /// * `value_field` - Column name for values (default: "value")
56    ///
57    /// # Example
58    ///
59    /// ```
60    /// use sqlite_vtable_opendal::backends::postgresql::PostgresqlBackend;
61    ///
62    /// let backend = PostgresqlBackend::new(
63    ///     "postgresql://localhost/mydb",
64    ///     "documents",
65    ///     "id",
66    ///     "content"
67    /// );
68    /// ```
69    pub fn new(
70        connection_string: impl Into<String>,
71        table: impl Into<String>,
72        key_field: impl Into<String>,
73        value_field: impl Into<String>,
74    ) -> Self {
75        Self {
76            connection_string: connection_string.into(),
77            table: table.into(),
78            key_field: key_field.into(),
79            value_field: value_field.into(),
80        }
81    }
82
83    /// Create an OpenDAL operator for PostgreSQL
84    fn create_operator(&self) -> Result<Operator> {
85        let builder = Postgresql::default()
86            .connection_string(&self.connection_string)
87            .table(&self.table)
88            .key_field(&self.key_field)
89            .value_field(&self.value_field);
90
91        Ok(Operator::new(builder)?
92            .finish())
93    }
94}
95
96#[async_trait]
97impl StorageBackend for PostgresqlBackend {
98    async fn list_files(&self, config: &QueryConfig) -> Result<Vec<FileMetadata>> {
99        let operator = self.create_operator()?;
100        let mut results = Vec::new();
101
102        // Also query PostgreSQL directly to get all keys
103        let (client, connection) = tokio_postgres::connect(&self.connection_string, NoTls)
104            .await
105            .map_err(|e| VTableError::Custom(format!("PostgreSQL connection error: {}", e)))?;
106
107        // Spawn connection handler
108        tokio::spawn(async move {
109            if let Err(e) = connection.await {
110                eprintln!("PostgreSQL connection error: {}", e);
111            }
112        });
113
114        // Query all keys from the table
115        let query = format!(
116            "SELECT {}::text FROM {}",
117            self.key_field, self.table
118        );
119
120        let rows = client
121            .query(&query, &[])
122            .await
123            .map_err(|e| VTableError::Custom(format!("PostgreSQL query error: {}", e)))?;
124
125        // Fetch metadata for each key
126        for row in rows {
127            let key: String = row.get(0);
128            let path = format!("/{}", key.trim_matches('/'));
129
130            // Get metadata using OpenDAL
131            let metadata_result = operator.stat(&path).await;
132
133            if let Ok(metadata) = metadata_result {
134                // Optionally fetch content
135                let content = if config.fetch_content {
136                    operator.read(&path).await.ok().map(|bytes| bytes.to_vec())
137                } else {
138                    None
139                };
140
141                results.push(FileMetadata {
142                    name: key.clone(),
143                    path: path.clone(),
144                    size: metadata.content_length(),
145                    last_modified: metadata.last_modified().map(|dt| dt.to_string()),
146                    etag: metadata
147                        .content_md5()
148                        .map(|md5| md5.to_string()),
149                    is_dir: false,
150                    content_type: Some("application/octet-stream".to_string()),
151                    content,
152                });
153
154                // Apply limit if specified
155                if let Some(limit) = config.limit {
156                    if results.len() >= limit + config.offset {
157                        break;
158                    }
159                }
160            }
161        }
162
163        // Apply offset
164        if config.offset > 0 && config.offset < results.len() {
165            results = results.into_iter().skip(config.offset).collect();
166        }
167
168        Ok(results)
169    }
170
171    fn backend_name(&self) -> &'static str {
172        "postgresql"
173    }
174}
175
176/// Register the postgresql virtual table with SQLite
177///
178/// This function creates a virtual table module that allows querying
179/// PostgreSQL database tables using SQL.
180///
181/// # Arguments
182///
183/// * `conn` - SQLite connection
184/// * `module_name` - Name for the virtual table (e.g., "pg_data")
185/// * `connection_string` - PostgreSQL connection string
186/// * `table` - Table name to query
187/// * `key_field` - Column name for keys
188/// * `value_field` - Column name for values
189///
190/// # Example
191///
192/// ```rust,ignore
193/// use rusqlite::Connection;
194/// use sqlite_vtable_opendal::backends::postgresql;
195///
196/// let conn = Connection::open_in_memory()?;
197/// postgresql::register(
198///     &conn,
199///     "pg_data",
200///     "postgresql://localhost/mydb",
201///     "documents",
202///     "id",
203///     "content"
204/// )?;
205///
206/// // Now you can query: SELECT * FROM pg_data
207/// ```
208pub fn register(
209    conn: &rusqlite::Connection,
210    module_name: &str,
211    connection_string: impl Into<String>,
212    table: impl Into<String>,
213    key_field: impl Into<String>,
214    value_field: impl Into<String>,
215) -> rusqlite::Result<()> {
216    use crate::types::{columns, QueryConfig};
217    use rusqlite::{
218        ffi,
219        vtab::{self, eponymous_only_module, IndexInfo, VTab, VTabCursor, VTabKind},
220    };
221    use std::os::raw::c_int;
222
223    let conn_str = connection_string.into();
224    let table_name = table.into();
225    let key_col = key_field.into();
226    let value_col = value_field.into();
227
228    // Create a specific table type for PostgreSQL
229    #[repr(C)]
230    struct PostgresqlTable {
231        base: ffi::sqlite3_vtab,
232        connection_string: String,
233        table: String,
234        key_field: String,
235        value_field: String,
236    }
237
238    // Create a specific cursor type for PostgreSQL
239    #[repr(C)]
240    struct PostgresqlCursor {
241        base: ffi::sqlite3_vtab_cursor,
242        files: Vec<crate::types::FileMetadata>,
243        current_row: usize,
244        connection_string: String,
245        table: String,
246        key_field: String,
247        value_field: String,
248    }
249
250    impl PostgresqlCursor {
251        fn new(
252            connection_string: String,
253            table: String,
254            key_field: String,
255            value_field: String,
256        ) -> Self {
257            Self {
258                base: ffi::sqlite3_vtab_cursor::default(),
259                files: Vec::new(),
260                current_row: 0,
261                connection_string,
262                table,
263                key_field,
264                value_field,
265            }
266        }
267    }
268
269    unsafe impl VTabCursor for PostgresqlCursor {
270        fn filter(
271            &mut self,
272            _idx_num: c_int,
273            _idx_str: Option<&str>,
274            _args: &vtab::Values<'_>,
275        ) -> rusqlite::Result<()> {
276            // Create backend and fetch files
277            let backend = PostgresqlBackend::new(
278                &self.connection_string,
279                &self.table,
280                &self.key_field,
281                &self.value_field,
282            );
283            let config = QueryConfig::default();
284
285            // Fetch files from the backend (blocking the async call)
286            let files = tokio::task::block_in_place(|| {
287                tokio::runtime::Handle::current().block_on(async {
288                    backend.list_files(&config).await
289                })
290            })
291            .map_err(|e| rusqlite::Error::ModuleError(e.to_string()))?;
292
293            self.files = files;
294            self.current_row = 0;
295            Ok(())
296        }
297
298        fn next(&mut self) -> rusqlite::Result<()> {
299            self.current_row += 1;
300            Ok(())
301        }
302
303        fn eof(&self) -> bool {
304            self.current_row >= self.files.len()
305        }
306
307        fn column(&self, ctx: &mut vtab::Context, col_index: c_int) -> rusqlite::Result<()> {
308            if self.current_row >= self.files.len() {
309                return Ok(());
310            }
311
312            let file = &self.files[self.current_row];
313
314            match col_index {
315                columns::PATH => ctx.set_result(&file.path),
316                columns::SIZE => ctx.set_result(&(file.size as i64)),
317                columns::LAST_MODIFIED => ctx.set_result(&file.last_modified),
318                columns::ETAG => ctx.set_result(&file.etag),
319                columns::IS_DIR => ctx.set_result(&file.is_dir),
320                columns::CONTENT_TYPE => ctx.set_result(&file.content_type),
321                columns::NAME => ctx.set_result(&file.name),
322                columns::CONTENT => {
323                    if let Some(ref content) = file.content {
324                        ctx.set_result(&content.as_slice())
325                    } else {
326                        ctx.set_result::<Option<&[u8]>>(&None)
327                    }
328                }
329                _ => Ok(()),
330            }
331        }
332
333        fn rowid(&self) -> rusqlite::Result<i64> {
334            Ok(self.current_row as i64)
335        }
336    }
337
338    impl vtab::CreateVTab<'_> for PostgresqlTable {
339        const KIND: VTabKind = VTabKind::EponymousOnly;
340    }
341
342    unsafe impl VTab<'_> for PostgresqlTable {
343        type Aux = (String, String, String, String);
344        type Cursor = PostgresqlCursor;
345
346        fn connect(
347            _db: &mut vtab::VTabConnection,
348            aux: Option<&Self::Aux>,
349            _args: &[&[u8]],
350        ) -> rusqlite::Result<(String, Self)> {
351            let schema = "
352                CREATE TABLE x(
353                    path TEXT,
354                    size INTEGER,
355                    last_modified TEXT,
356                    etag TEXT,
357                    is_dir INTEGER,
358                    content_type TEXT,
359                    name TEXT,
360                    content BLOB
361                )
362            ";
363
364            let (connection_string, table, key_field, value_field) =
365                if let Some((conn, tbl, key, val)) = aux {
366                    (
367                        conn.clone(),
368                        tbl.clone(),
369                        key.clone(),
370                        val.clone(),
371                    )
372                } else {
373                    (
374                        "".to_string(),
375                        "".to_string(),
376                        "key".to_string(),
377                        "value".to_string(),
378                    )
379                };
380
381            Ok((
382                schema.to_owned(),
383                PostgresqlTable {
384                    base: ffi::sqlite3_vtab::default(),
385                    connection_string,
386                    table,
387                    key_field,
388                    value_field,
389                },
390            ))
391        }
392
393        fn best_index(&self, info: &mut IndexInfo) -> rusqlite::Result<()> {
394            info.set_estimated_cost(1000.0);
395            Ok(())
396        }
397
398        fn open(&mut self) -> rusqlite::Result<Self::Cursor> {
399            Ok(PostgresqlCursor::new(
400                self.connection_string.clone(),
401                self.table.clone(),
402                self.key_field.clone(),
403                self.value_field.clone(),
404            ))
405        }
406    }
407
408    conn.create_module(
409        module_name,
410        eponymous_only_module::<PostgresqlTable>(),
411        Some((conn_str, table_name, key_col, value_col)),
412    )
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418
419    #[test]
420    fn test_backend_creation() {
421        let backend = PostgresqlBackend::new(
422            "postgresql://localhost/test",
423            "documents",
424            "id",
425            "data",
426        );
427        assert_eq!(backend.connection_string, "postgresql://localhost/test");
428        assert_eq!(backend.table, "documents");
429        assert_eq!(backend.key_field, "id");
430        assert_eq!(backend.value_field, "data");
431        assert_eq!(backend.backend_name(), "postgresql");
432    }
433
434    #[test]
435    fn test_backend_with_different_fields() {
436        let backend = PostgresqlBackend::new(
437            "postgresql://user:pass@db.example.com:5432/mydb",
438            "my_table",
439            "key_column",
440            "value_column",
441        );
442        assert_eq!(backend.key_field, "key_column");
443        assert_eq!(backend.value_field, "value_column");
444    }
445
446    // Note: Integration tests with actual PostgreSQL would require a running database
447    // and are better suited for manual testing or CI with docker-compose
448}