sqlite_vtable_opendal/backends/
postgresql.rs1use crate::backends::StorageBackend;
8use crate::error::{Result, VTableError};
9use crate::types::{FileMetadata, QueryConfig};
10use async_trait::async_trait;
11use opendal::{services::Postgresql, Operator};
12use tokio_postgres::NoTls;
13
14pub struct PostgresqlBackend {
37 connection_string: String,
39 table: String,
41 key_field: String,
43 value_field: String,
45}
46
47impl PostgresqlBackend {
48 pub fn new(
70 connection_string: impl Into<String>,
71 table: impl Into<String>,
72 key_field: impl Into<String>,
73 value_field: impl Into<String>,
74 ) -> Self {
75 Self {
76 connection_string: connection_string.into(),
77 table: table.into(),
78 key_field: key_field.into(),
79 value_field: value_field.into(),
80 }
81 }
82
83 fn create_operator(&self) -> Result<Operator> {
85 let builder = Postgresql::default()
86 .connection_string(&self.connection_string)
87 .table(&self.table)
88 .key_field(&self.key_field)
89 .value_field(&self.value_field);
90
91 Operator::new(builder)
92 .map(|op| op.finish())
93 .map_err(|e| VTableError::OpenDal(e))
94 }
95}
96
97#[async_trait]
98impl StorageBackend for PostgresqlBackend {
99 async fn list_files(&self, config: &QueryConfig) -> Result<Vec<FileMetadata>> {
100 let operator = self.create_operator()?;
101 let mut results = Vec::new();
102
103 let (client, connection) = tokio_postgres::connect(&self.connection_string, NoTls)
105 .await
106 .map_err(|e| VTableError::Custom(format!("PostgreSQL connection error: {}", e)))?;
107
108 tokio::spawn(async move {
110 if let Err(e) = connection.await {
111 eprintln!("PostgreSQL connection error: {}", e);
112 }
113 });
114
115 let query = format!(
117 "SELECT {}::text FROM {}",
118 self.key_field, self.table
119 );
120
121 let rows = client
122 .query(&query, &[])
123 .await
124 .map_err(|e| VTableError::Custom(format!("PostgreSQL query error: {}", e)))?;
125
126 for row in rows {
128 let key: String = row.get(0);
129 let path = format!("/{}", key.trim_matches('/'));
130
131 let metadata_result = operator.stat(&path).await;
133
134 if let Ok(metadata) = metadata_result {
135 let content = if config.fetch_content {
137 operator.read(&path).await.ok().map(|bytes| bytes.to_vec())
138 } else {
139 None
140 };
141
142 results.push(FileMetadata {
143 name: key.clone(),
144 path: path.clone(),
145 size: metadata.content_length(),
146 last_modified: metadata.last_modified().map(|dt| dt.to_string()),
147 etag: metadata
148 .content_md5()
149 .map(|md5| md5.to_string()),
150 is_dir: false,
151 content_type: Some("application/octet-stream".to_string()),
152 content,
153 });
154
155 if let Some(limit) = config.limit {
157 if results.len() >= limit + config.offset {
158 break;
159 }
160 }
161 }
162 }
163
164 if config.offset > 0 && config.offset < results.len() {
166 results = results.into_iter().skip(config.offset).collect();
167 }
168
169 Ok(results)
170 }
171
172 fn backend_name(&self) -> &'static str {
173 "postgresql"
174 }
175}
176
177pub fn register(
210 conn: &rusqlite::Connection,
211 module_name: &str,
212 connection_string: impl Into<String>,
213 table: impl Into<String>,
214 key_field: impl Into<String>,
215 value_field: impl Into<String>,
216) -> rusqlite::Result<()> {
217 use crate::types::{columns, QueryConfig};
218 use rusqlite::{
219 ffi,
220 vtab::{self, eponymous_only_module, IndexInfo, VTab, VTabCursor, VTabKind},
221 };
222 use std::os::raw::c_int;
223
224 let conn_str = connection_string.into();
225 let table_name = table.into();
226 let key_col = key_field.into();
227 let value_col = value_field.into();
228
229 #[repr(C)]
231 struct PostgresqlTable {
232 base: ffi::sqlite3_vtab,
233 connection_string: String,
234 table: String,
235 key_field: String,
236 value_field: String,
237 }
238
239 #[repr(C)]
241 struct PostgresqlCursor {
242 base: ffi::sqlite3_vtab_cursor,
243 files: Vec<crate::types::FileMetadata>,
244 current_row: usize,
245 connection_string: String,
246 table: String,
247 key_field: String,
248 value_field: String,
249 }
250
251 impl PostgresqlCursor {
252 fn new(
253 connection_string: String,
254 table: String,
255 key_field: String,
256 value_field: String,
257 ) -> Self {
258 Self {
259 base: ffi::sqlite3_vtab_cursor::default(),
260 files: Vec::new(),
261 current_row: 0,
262 connection_string,
263 table,
264 key_field,
265 value_field,
266 }
267 }
268 }
269
270 unsafe impl VTabCursor for PostgresqlCursor {
271 fn filter(
272 &mut self,
273 _idx_num: c_int,
274 _idx_str: Option<&str>,
275 _args: &vtab::Values<'_>,
276 ) -> rusqlite::Result<()> {
277 let backend = PostgresqlBackend::new(
279 &self.connection_string,
280 &self.table,
281 &self.key_field,
282 &self.value_field,
283 );
284 let config = QueryConfig::default();
285
286 let files = tokio::task::block_in_place(|| {
288 tokio::runtime::Handle::current().block_on(async {
289 backend.list_files(&config).await
290 })
291 })
292 .map_err(|e| rusqlite::Error::ModuleError(e.to_string()))?;
293
294 self.files = files;
295 self.current_row = 0;
296 Ok(())
297 }
298
299 fn next(&mut self) -> rusqlite::Result<()> {
300 self.current_row += 1;
301 Ok(())
302 }
303
304 fn eof(&self) -> bool {
305 self.current_row >= self.files.len()
306 }
307
308 fn column(&self, ctx: &mut vtab::Context, col_index: c_int) -> rusqlite::Result<()> {
309 if self.current_row >= self.files.len() {
310 return Ok(());
311 }
312
313 let file = &self.files[self.current_row];
314
315 match col_index {
316 columns::PATH => ctx.set_result(&file.path),
317 columns::SIZE => ctx.set_result(&(file.size as i64)),
318 columns::LAST_MODIFIED => ctx.set_result(&file.last_modified),
319 columns::ETAG => ctx.set_result(&file.etag),
320 columns::IS_DIR => ctx.set_result(&file.is_dir),
321 columns::CONTENT_TYPE => ctx.set_result(&file.content_type),
322 columns::NAME => ctx.set_result(&file.name),
323 columns::CONTENT => {
324 if let Some(ref content) = file.content {
325 ctx.set_result(&content.as_slice())
326 } else {
327 ctx.set_result::<Option<&[u8]>>(&None)
328 }
329 }
330 _ => Ok(()),
331 }
332 }
333
334 fn rowid(&self) -> rusqlite::Result<i64> {
335 Ok(self.current_row as i64)
336 }
337 }
338
339 impl vtab::CreateVTab<'_> for PostgresqlTable {
340 const KIND: VTabKind = VTabKind::EponymousOnly;
341 }
342
343 unsafe impl VTab<'_> for PostgresqlTable {
344 type Aux = (String, String, String, String);
345 type Cursor = PostgresqlCursor;
346
347 fn connect(
348 _db: &mut vtab::VTabConnection,
349 aux: Option<&Self::Aux>,
350 _args: &[&[u8]],
351 ) -> rusqlite::Result<(String, Self)> {
352 let schema = "
353 CREATE TABLE x(
354 path TEXT,
355 size INTEGER,
356 last_modified TEXT,
357 etag TEXT,
358 is_dir INTEGER,
359 content_type TEXT,
360 name TEXT,
361 content BLOB
362 )
363 ";
364
365 let (connection_string, table, key_field, value_field) =
366 if let Some((conn, tbl, key, val)) = aux {
367 (
368 conn.clone(),
369 tbl.clone(),
370 key.clone(),
371 val.clone(),
372 )
373 } else {
374 (
375 "".to_string(),
376 "".to_string(),
377 "key".to_string(),
378 "value".to_string(),
379 )
380 };
381
382 Ok((
383 schema.to_owned(),
384 PostgresqlTable {
385 base: ffi::sqlite3_vtab::default(),
386 connection_string,
387 table,
388 key_field,
389 value_field,
390 },
391 ))
392 }
393
394 fn best_index(&self, info: &mut IndexInfo) -> rusqlite::Result<()> {
395 info.set_estimated_cost(1000.0);
396 Ok(())
397 }
398
399 fn open(&mut self) -> rusqlite::Result<Self::Cursor> {
400 Ok(PostgresqlCursor::new(
401 self.connection_string.clone(),
402 self.table.clone(),
403 self.key_field.clone(),
404 self.value_field.clone(),
405 ))
406 }
407 }
408
409 conn.create_module(
410 module_name,
411 eponymous_only_module::<PostgresqlTable>(),
412 Some((conn_str, table_name, key_col, value_col)),
413 )
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419
420 #[test]
421 fn test_backend_creation() {
422 let backend = PostgresqlBackend::new(
423 "postgresql://localhost/test",
424 "documents",
425 "id",
426 "data",
427 );
428 assert_eq!(backend.connection_string, "postgresql://localhost/test");
429 assert_eq!(backend.table, "documents");
430 assert_eq!(backend.key_field, "id");
431 assert_eq!(backend.value_field, "data");
432 assert_eq!(backend.backend_name(), "postgresql");
433 }
434
435 #[test]
436 fn test_backend_with_different_fields() {
437 let backend = PostgresqlBackend::new(
438 "postgresql://user:pass@db.example.com:5432/mydb",
439 "my_table",
440 "key_column",
441 "value_column",
442 );
443 assert_eq!(backend.key_field, "key_column");
444 assert_eq!(backend.value_field, "value_column");
445 }
446
447 }