Skip to main content

sqlite_vtable_opendal/backends/
postgresql.rs

1//! PostgreSQL storage backend implementation
2//!
3//! This backend allows querying PostgreSQL database tables using SQL.
4//! It treats database records as "files" where the key column is the path
5//! and the value column is the content.
6
7use crate::backends::StorageBackend;
8use crate::error::{Result, VTableError};
9use crate::types::{FileMetadata, QueryConfig};
10use async_trait::async_trait;
11use opendal::{services::Postgresql, Operator};
12use tokio_postgres::NoTls;
13
14/// PostgreSQL storage backend
15///
16/// This backend uses OpenDAL's Postgresql service to query database tables.
17/// Each row is treated as a "file" where:
18/// - The key_field column becomes the file path
19/// - The value_field column becomes the file content
20///
21/// # Example
22///
23/// ```rust,ignore
24/// use sqlite_vtable_opendal::backends::postgresql::PostgresqlBackend;
25/// use sqlite_vtable_opendal::types::QueryConfig;
26///
27/// let backend = PostgresqlBackend::new(
28///     "postgresql://user:pass@localhost/db",
29///     "my_table",
30///     "id",
31///     "data"
32/// );
33/// let config = QueryConfig::default();
34/// let files = backend.list_files(&config).await?;
35/// ```
36pub struct PostgresqlBackend {
37    /// PostgreSQL connection string
38    connection_string: String,
39    /// Table name to query
40    table: String,
41    /// Column name for the key (becomes file path)
42    key_field: String,
43    /// Column name for the value (becomes file content)
44    value_field: String,
45}
46
47impl PostgresqlBackend {
48    /// Create a new PostgreSQL backend
49    ///
50    /// # Arguments
51    ///
52    /// * `connection_string` - PostgreSQL connection string
53    /// * `table` - Table name to query
54    /// * `key_field` - Column name for keys (default: "key")
55    /// * `value_field` - Column name for values (default: "value")
56    ///
57    /// # Example
58    ///
59    /// ```
60    /// use sqlite_vtable_opendal::backends::postgresql::PostgresqlBackend;
61    ///
62    /// let backend = PostgresqlBackend::new(
63    ///     "postgresql://localhost/mydb",
64    ///     "documents",
65    ///     "id",
66    ///     "content"
67    /// );
68    /// ```
69    pub fn new(
70        connection_string: impl Into<String>,
71        table: impl Into<String>,
72        key_field: impl Into<String>,
73        value_field: impl Into<String>,
74    ) -> Self {
75        Self {
76            connection_string: connection_string.into(),
77            table: table.into(),
78            key_field: key_field.into(),
79            value_field: value_field.into(),
80        }
81    }
82
83    /// Create an OpenDAL operator for PostgreSQL
84    fn create_operator(&self) -> Result<Operator> {
85        let builder = Postgresql::default()
86            .connection_string(&self.connection_string)
87            .table(&self.table)
88            .key_field(&self.key_field)
89            .value_field(&self.value_field);
90
91        Operator::new(builder)
92            .map(|op| op.finish())
93            .map_err(|e| VTableError::OpenDal(e))
94    }
95}
96
97#[async_trait]
98impl StorageBackend for PostgresqlBackend {
99    async fn list_files(&self, config: &QueryConfig) -> Result<Vec<FileMetadata>> {
100        let operator = self.create_operator()?;
101        let mut results = Vec::new();
102
103        // Also query PostgreSQL directly to get all keys
104        let (client, connection) = tokio_postgres::connect(&self.connection_string, NoTls)
105            .await
106            .map_err(|e| VTableError::Custom(format!("PostgreSQL connection error: {}", e)))?;
107
108        // Spawn connection handler
109        tokio::spawn(async move {
110            if let Err(e) = connection.await {
111                eprintln!("PostgreSQL connection error: {}", e);
112            }
113        });
114
115        // Query all keys from the table
116        let query = format!(
117            "SELECT {}::text FROM {}",
118            self.key_field, self.table
119        );
120
121        let rows = client
122            .query(&query, &[])
123            .await
124            .map_err(|e| VTableError::Custom(format!("PostgreSQL query error: {}", e)))?;
125
126        // Fetch metadata for each key
127        for row in rows {
128            let key: String = row.get(0);
129            let path = format!("/{}", key.trim_matches('/'));
130
131            // Get metadata using OpenDAL
132            let metadata_result = operator.stat(&path).await;
133
134            if let Ok(metadata) = metadata_result {
135                // Optionally fetch content
136                let content = if config.fetch_content {
137                    operator.read(&path).await.ok().map(|bytes| bytes.to_vec())
138                } else {
139                    None
140                };
141
142                results.push(FileMetadata {
143                    name: key.clone(),
144                    path: path.clone(),
145                    size: metadata.content_length(),
146                    last_modified: metadata.last_modified().map(|dt| dt.to_string()),
147                    etag: metadata
148                        .content_md5()
149                        .map(|md5| md5.to_string()),
150                    is_dir: false,
151                    content_type: Some("application/octet-stream".to_string()),
152                    content,
153                });
154
155                // Apply limit if specified
156                if let Some(limit) = config.limit {
157                    if results.len() >= limit + config.offset {
158                        break;
159                    }
160                }
161            }
162        }
163
164        // Apply offset
165        if config.offset > 0 && config.offset < results.len() {
166            results = results.into_iter().skip(config.offset).collect();
167        }
168
169        Ok(results)
170    }
171
172    fn backend_name(&self) -> &'static str {
173        "postgresql"
174    }
175}
176
177/// Register the postgresql virtual table with SQLite
178///
179/// This function creates a virtual table module that allows querying
180/// PostgreSQL database tables using SQL.
181///
182/// # Arguments
183///
184/// * `conn` - SQLite connection
185/// * `module_name` - Name for the virtual table (e.g., "pg_data")
186/// * `connection_string` - PostgreSQL connection string
187/// * `table` - Table name to query
188/// * `key_field` - Column name for keys
189/// * `value_field` - Column name for values
190///
191/// # Example
192///
193/// ```rust,ignore
194/// use rusqlite::Connection;
195/// use sqlite_vtable_opendal::backends::postgresql;
196///
197/// let conn = Connection::open_in_memory()?;
198/// postgresql::register(
199///     &conn,
200///     "pg_data",
201///     "postgresql://localhost/mydb",
202///     "documents",
203///     "id",
204///     "content"
205/// )?;
206///
207/// // Now you can query: SELECT * FROM pg_data
208/// ```
209pub fn register(
210    conn: &rusqlite::Connection,
211    module_name: &str,
212    connection_string: impl Into<String>,
213    table: impl Into<String>,
214    key_field: impl Into<String>,
215    value_field: impl Into<String>,
216) -> rusqlite::Result<()> {
217    use crate::types::{columns, QueryConfig};
218    use rusqlite::{
219        ffi,
220        vtab::{self, eponymous_only_module, IndexInfo, VTab, VTabCursor, VTabKind},
221    };
222    use std::os::raw::c_int;
223
224    let conn_str = connection_string.into();
225    let table_name = table.into();
226    let key_col = key_field.into();
227    let value_col = value_field.into();
228
229    // Create a specific table type for PostgreSQL
230    #[repr(C)]
231    struct PostgresqlTable {
232        base: ffi::sqlite3_vtab,
233        connection_string: String,
234        table: String,
235        key_field: String,
236        value_field: String,
237    }
238
239    // Create a specific cursor type for PostgreSQL
240    #[repr(C)]
241    struct PostgresqlCursor {
242        base: ffi::sqlite3_vtab_cursor,
243        files: Vec<crate::types::FileMetadata>,
244        current_row: usize,
245        connection_string: String,
246        table: String,
247        key_field: String,
248        value_field: String,
249    }
250
251    impl PostgresqlCursor {
252        fn new(
253            connection_string: String,
254            table: String,
255            key_field: String,
256            value_field: String,
257        ) -> Self {
258            Self {
259                base: ffi::sqlite3_vtab_cursor::default(),
260                files: Vec::new(),
261                current_row: 0,
262                connection_string,
263                table,
264                key_field,
265                value_field,
266            }
267        }
268    }
269
270    unsafe impl VTabCursor for PostgresqlCursor {
271        fn filter(
272            &mut self,
273            _idx_num: c_int,
274            _idx_str: Option<&str>,
275            _args: &vtab::Values<'_>,
276        ) -> rusqlite::Result<()> {
277            // Create backend and fetch files
278            let backend = PostgresqlBackend::new(
279                &self.connection_string,
280                &self.table,
281                &self.key_field,
282                &self.value_field,
283            );
284            let config = QueryConfig::default();
285
286            // Fetch files from the backend (blocking the async call)
287            let files = tokio::task::block_in_place(|| {
288                tokio::runtime::Handle::current().block_on(async {
289                    backend.list_files(&config).await
290                })
291            })
292            .map_err(|e| rusqlite::Error::ModuleError(e.to_string()))?;
293
294            self.files = files;
295            self.current_row = 0;
296            Ok(())
297        }
298
299        fn next(&mut self) -> rusqlite::Result<()> {
300            self.current_row += 1;
301            Ok(())
302        }
303
304        fn eof(&self) -> bool {
305            self.current_row >= self.files.len()
306        }
307
308        fn column(&self, ctx: &mut vtab::Context, col_index: c_int) -> rusqlite::Result<()> {
309            if self.current_row >= self.files.len() {
310                return Ok(());
311            }
312
313            let file = &self.files[self.current_row];
314
315            match col_index {
316                columns::PATH => ctx.set_result(&file.path),
317                columns::SIZE => ctx.set_result(&(file.size as i64)),
318                columns::LAST_MODIFIED => ctx.set_result(&file.last_modified),
319                columns::ETAG => ctx.set_result(&file.etag),
320                columns::IS_DIR => ctx.set_result(&file.is_dir),
321                columns::CONTENT_TYPE => ctx.set_result(&file.content_type),
322                columns::NAME => ctx.set_result(&file.name),
323                columns::CONTENT => {
324                    if let Some(ref content) = file.content {
325                        ctx.set_result(&content.as_slice())
326                    } else {
327                        ctx.set_result::<Option<&[u8]>>(&None)
328                    }
329                }
330                _ => Ok(()),
331            }
332        }
333
334        fn rowid(&self) -> rusqlite::Result<i64> {
335            Ok(self.current_row as i64)
336        }
337    }
338
339    impl vtab::CreateVTab<'_> for PostgresqlTable {
340        const KIND: VTabKind = VTabKind::EponymousOnly;
341    }
342
343    unsafe impl VTab<'_> for PostgresqlTable {
344        type Aux = (String, String, String, String);
345        type Cursor = PostgresqlCursor;
346
347        fn connect(
348            _db: &mut vtab::VTabConnection,
349            aux: Option<&Self::Aux>,
350            _args: &[&[u8]],
351        ) -> rusqlite::Result<(String, Self)> {
352            let schema = "
353                CREATE TABLE x(
354                    path TEXT,
355                    size INTEGER,
356                    last_modified TEXT,
357                    etag TEXT,
358                    is_dir INTEGER,
359                    content_type TEXT,
360                    name TEXT,
361                    content BLOB
362                )
363            ";
364
365            let (connection_string, table, key_field, value_field) =
366                if let Some((conn, tbl, key, val)) = aux {
367                    (
368                        conn.clone(),
369                        tbl.clone(),
370                        key.clone(),
371                        val.clone(),
372                    )
373                } else {
374                    (
375                        "".to_string(),
376                        "".to_string(),
377                        "key".to_string(),
378                        "value".to_string(),
379                    )
380                };
381
382            Ok((
383                schema.to_owned(),
384                PostgresqlTable {
385                    base: ffi::sqlite3_vtab::default(),
386                    connection_string,
387                    table,
388                    key_field,
389                    value_field,
390                },
391            ))
392        }
393
394        fn best_index(&self, info: &mut IndexInfo) -> rusqlite::Result<()> {
395            info.set_estimated_cost(1000.0);
396            Ok(())
397        }
398
399        fn open(&mut self) -> rusqlite::Result<Self::Cursor> {
400            Ok(PostgresqlCursor::new(
401                self.connection_string.clone(),
402                self.table.clone(),
403                self.key_field.clone(),
404                self.value_field.clone(),
405            ))
406        }
407    }
408
409    conn.create_module(
410        module_name,
411        eponymous_only_module::<PostgresqlTable>(),
412        Some((conn_str, table_name, key_col, value_col)),
413    )
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419
420    #[test]
421    fn test_backend_creation() {
422        let backend = PostgresqlBackend::new(
423            "postgresql://localhost/test",
424            "documents",
425            "id",
426            "data",
427        );
428        assert_eq!(backend.connection_string, "postgresql://localhost/test");
429        assert_eq!(backend.table, "documents");
430        assert_eq!(backend.key_field, "id");
431        assert_eq!(backend.value_field, "data");
432        assert_eq!(backend.backend_name(), "postgresql");
433    }
434
435    #[test]
436    fn test_backend_with_different_fields() {
437        let backend = PostgresqlBackend::new(
438            "postgresql://user:pass@db.example.com:5432/mydb",
439            "my_table",
440            "key_column",
441            "value_column",
442        );
443        assert_eq!(backend.key_field, "key_column");
444        assert_eq!(backend.value_field, "value_column");
445    }
446
447    // Note: Integration tests with actual PostgreSQL would require a running database
448    // and are better suited for manual testing or CI with docker-compose
449}