sqlite_vtable_opendal/backends/
postgresql.rs1use crate::backends::StorageBackend;
8use crate::error::{Result, VTableError};
9use crate::types::{FileMetadata, QueryConfig};
10use async_trait::async_trait;
11use opendal::{services::Postgresql, Operator};
12use tokio_postgres::NoTls;
13
14pub struct PostgresqlBackend {
37 connection_string: String,
39 table: String,
41 key_field: String,
43 value_field: String,
45}
46
47impl PostgresqlBackend {
48 pub fn new(
70 connection_string: impl Into<String>,
71 table: impl Into<String>,
72 key_field: impl Into<String>,
73 value_field: impl Into<String>,
74 ) -> Self {
75 Self {
76 connection_string: connection_string.into(),
77 table: table.into(),
78 key_field: key_field.into(),
79 value_field: value_field.into(),
80 }
81 }
82
83 fn create_operator(&self) -> Result<Operator> {
85 let builder = Postgresql::default()
86 .connection_string(&self.connection_string)
87 .table(&self.table)
88 .key_field(&self.key_field)
89 .value_field(&self.value_field);
90
91 Ok(Operator::new(builder)?
92 .finish())
93 }
94}
95
96#[async_trait]
97impl StorageBackend for PostgresqlBackend {
98 async fn list_files(&self, config: &QueryConfig) -> Result<Vec<FileMetadata>> {
99 let operator = self.create_operator()?;
100 let mut results = Vec::new();
101
102 let (client, connection) = tokio_postgres::connect(&self.connection_string, NoTls)
104 .await
105 .map_err(|e| VTableError::Custom(format!("PostgreSQL connection error: {}", e)))?;
106
107 tokio::spawn(async move {
109 if let Err(e) = connection.await {
110 eprintln!("PostgreSQL connection error: {}", e);
111 }
112 });
113
114 let query = format!(
116 "SELECT {}::text FROM {}",
117 self.key_field, self.table
118 );
119
120 let rows = client
121 .query(&query, &[])
122 .await
123 .map_err(|e| VTableError::Custom(format!("PostgreSQL query error: {}", e)))?;
124
125 for row in rows {
127 let key: String = row.get(0);
128 let path = format!("/{}", key.trim_matches('/'));
129
130 let metadata_result = operator.stat(&path).await;
132
133 if let Ok(metadata) = metadata_result {
134 let content = if config.fetch_content {
136 operator.read(&path).await.ok().map(|bytes| bytes.to_vec())
137 } else {
138 None
139 };
140
141 results.push(FileMetadata {
142 name: key.clone(),
143 path: path.clone(),
144 size: metadata.content_length(),
145 last_modified: metadata.last_modified().map(|dt| dt.to_string()),
146 etag: metadata
147 .content_md5()
148 .map(|md5| md5.to_string()),
149 is_dir: false,
150 content_type: Some("application/octet-stream".to_string()),
151 content,
152 });
153
154 if let Some(limit) = config.limit {
156 if results.len() >= limit + config.offset {
157 break;
158 }
159 }
160 }
161 }
162
163 if config.offset > 0 && config.offset < results.len() {
165 results = results.into_iter().skip(config.offset).collect();
166 }
167
168 Ok(results)
169 }
170
171 fn backend_name(&self) -> &'static str {
172 "postgresql"
173 }
174}
175
176pub fn register(
209 conn: &rusqlite::Connection,
210 module_name: &str,
211 connection_string: impl Into<String>,
212 table: impl Into<String>,
213 key_field: impl Into<String>,
214 value_field: impl Into<String>,
215) -> rusqlite::Result<()> {
216 use crate::types::{columns, QueryConfig};
217 use rusqlite::{
218 ffi,
219 vtab::{self, eponymous_only_module, IndexInfo, VTab, VTabCursor, VTabKind},
220 };
221 use std::os::raw::c_int;
222
223 let conn_str = connection_string.into();
224 let table_name = table.into();
225 let key_col = key_field.into();
226 let value_col = value_field.into();
227
228 #[repr(C)]
230 struct PostgresqlTable {
231 base: ffi::sqlite3_vtab,
232 connection_string: String,
233 table: String,
234 key_field: String,
235 value_field: String,
236 }
237
238 #[repr(C)]
240 struct PostgresqlCursor {
241 base: ffi::sqlite3_vtab_cursor,
242 files: Vec<crate::types::FileMetadata>,
243 current_row: usize,
244 connection_string: String,
245 table: String,
246 key_field: String,
247 value_field: String,
248 }
249
250 impl PostgresqlCursor {
251 fn new(
252 connection_string: String,
253 table: String,
254 key_field: String,
255 value_field: String,
256 ) -> Self {
257 Self {
258 base: ffi::sqlite3_vtab_cursor::default(),
259 files: Vec::new(),
260 current_row: 0,
261 connection_string,
262 table,
263 key_field,
264 value_field,
265 }
266 }
267 }
268
269 unsafe impl VTabCursor for PostgresqlCursor {
270 fn filter(
271 &mut self,
272 _idx_num: c_int,
273 _idx_str: Option<&str>,
274 _args: &vtab::Values<'_>,
275 ) -> rusqlite::Result<()> {
276 let backend = PostgresqlBackend::new(
278 &self.connection_string,
279 &self.table,
280 &self.key_field,
281 &self.value_field,
282 );
283 let config = QueryConfig::default();
284
285 let files = tokio::task::block_in_place(|| {
287 tokio::runtime::Handle::current().block_on(async {
288 backend.list_files(&config).await
289 })
290 })
291 .map_err(|e| rusqlite::Error::ModuleError(e.to_string()))?;
292
293 self.files = files;
294 self.current_row = 0;
295 Ok(())
296 }
297
298 fn next(&mut self) -> rusqlite::Result<()> {
299 self.current_row += 1;
300 Ok(())
301 }
302
303 fn eof(&self) -> bool {
304 self.current_row >= self.files.len()
305 }
306
307 fn column(&self, ctx: &mut vtab::Context, col_index: c_int) -> rusqlite::Result<()> {
308 if self.current_row >= self.files.len() {
309 return Ok(());
310 }
311
312 let file = &self.files[self.current_row];
313
314 match col_index {
315 columns::PATH => ctx.set_result(&file.path),
316 columns::SIZE => ctx.set_result(&(file.size as i64)),
317 columns::LAST_MODIFIED => ctx.set_result(&file.last_modified),
318 columns::ETAG => ctx.set_result(&file.etag),
319 columns::IS_DIR => ctx.set_result(&file.is_dir),
320 columns::CONTENT_TYPE => ctx.set_result(&file.content_type),
321 columns::NAME => ctx.set_result(&file.name),
322 columns::CONTENT => {
323 if let Some(ref content) = file.content {
324 ctx.set_result(&content.as_slice())
325 } else {
326 ctx.set_result::<Option<&[u8]>>(&None)
327 }
328 }
329 _ => Ok(()),
330 }
331 }
332
333 fn rowid(&self) -> rusqlite::Result<i64> {
334 Ok(self.current_row as i64)
335 }
336 }
337
338 impl vtab::CreateVTab<'_> for PostgresqlTable {
339 const KIND: VTabKind = VTabKind::EponymousOnly;
340 }
341
342 unsafe impl VTab<'_> for PostgresqlTable {
343 type Aux = (String, String, String, String);
344 type Cursor = PostgresqlCursor;
345
346 fn connect(
347 _db: &mut vtab::VTabConnection,
348 aux: Option<&Self::Aux>,
349 _args: &[&[u8]],
350 ) -> rusqlite::Result<(String, Self)> {
351 let schema = "
352 CREATE TABLE x(
353 path TEXT,
354 size INTEGER,
355 last_modified TEXT,
356 etag TEXT,
357 is_dir INTEGER,
358 content_type TEXT,
359 name TEXT,
360 content BLOB
361 )
362 ";
363
364 let (connection_string, table, key_field, value_field) =
365 if let Some((conn, tbl, key, val)) = aux {
366 (
367 conn.clone(),
368 tbl.clone(),
369 key.clone(),
370 val.clone(),
371 )
372 } else {
373 (
374 "".to_string(),
375 "".to_string(),
376 "key".to_string(),
377 "value".to_string(),
378 )
379 };
380
381 Ok((
382 schema.to_owned(),
383 PostgresqlTable {
384 base: ffi::sqlite3_vtab::default(),
385 connection_string,
386 table,
387 key_field,
388 value_field,
389 },
390 ))
391 }
392
393 fn best_index(&self, info: &mut IndexInfo) -> rusqlite::Result<()> {
394 info.set_estimated_cost(1000.0);
395 Ok(())
396 }
397
398 fn open(&mut self) -> rusqlite::Result<Self::Cursor> {
399 Ok(PostgresqlCursor::new(
400 self.connection_string.clone(),
401 self.table.clone(),
402 self.key_field.clone(),
403 self.value_field.clone(),
404 ))
405 }
406 }
407
408 conn.create_module(
409 module_name,
410 eponymous_only_module::<PostgresqlTable>(),
411 Some((conn_str, table_name, key_col, value_col)),
412 )
413}
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418
419 #[test]
420 fn test_backend_creation() {
421 let backend = PostgresqlBackend::new(
422 "postgresql://localhost/test",
423 "documents",
424 "id",
425 "data",
426 );
427 assert_eq!(backend.connection_string, "postgresql://localhost/test");
428 assert_eq!(backend.table, "documents");
429 assert_eq!(backend.key_field, "id");
430 assert_eq!(backend.value_field, "data");
431 assert_eq!(backend.backend_name(), "postgresql");
432 }
433
434 #[test]
435 fn test_backend_with_different_fields() {
436 let backend = PostgresqlBackend::new(
437 "postgresql://user:pass@db.example.com:5432/mydb",
438 "my_table",
439 "key_column",
440 "value_column",
441 );
442 assert_eq!(backend.key_field, "key_column");
443 assert_eq!(backend.value_field, "value_column");
444 }
445
446 }