sqlite_vtable_opendal/backends/
http.rs1use crate::backends::StorageBackend;
7use crate::error::{Result, VTableError};
8use crate::types::{FileMetadata, QueryConfig};
9use async_trait::async_trait;
10use opendal::{services::Http, Operator};
11
12pub struct HttpBackend {
28 endpoint: String,
30}
31
32impl HttpBackend {
33 pub fn new(endpoint: impl Into<String>) -> Self {
47 Self {
48 endpoint: endpoint.into(),
49 }
50 }
51
52 fn create_operator(&self) -> Result<Operator> {
54 let builder = Http::default().endpoint(&self.endpoint);
55
56 Operator::new(builder)
57 .map(|op| op.finish())
58 .map_err(|e| VTableError::OpenDal(e))
59 }
60}
61
62#[async_trait]
63impl StorageBackend for HttpBackend {
64 async fn list_files(&self, config: &QueryConfig) -> Result<Vec<FileMetadata>> {
65 let operator = self.create_operator()?;
66 let mut results = Vec::new();
67
68 let normalized_path = if config.root_path.is_empty() || config.root_path == "/" {
70 "".to_string()
71 } else {
72 config.root_path.trim_matches('/').to_string()
73 };
74
75 let path = if normalized_path.is_empty() {
78 "".to_string()
79 } else {
80 normalized_path.clone()
81 };
82
83 match operator.stat(&path).await {
84 Ok(metadata) => {
85 let content = if config.fetch_content {
87 operator.read(&path).await.ok().map(|bytes| bytes.to_vec())
88 } else {
89 None
90 };
91
92 let name = if path.is_empty() {
94 "index".to_string()
95 } else {
96 path.split('/').last().unwrap_or(&path).to_string()
97 };
98
99 results.push(FileMetadata {
100 name,
101 path: if path.is_empty() {
102 "/".to_string()
103 } else {
104 format!("/{}", path)
105 },
106 size: metadata.content_length(),
107 last_modified: metadata.last_modified().map(|dt| dt.to_string()),
108 etag: metadata.etag().map(|e| e.to_string()),
109 is_dir: false,
110 content_type: metadata.content_type().map(|ct| ct.to_string()),
111 content,
112 });
113 }
114 Err(_) => {
115 if let Ok(bytes) = operator.read(&path).await {
117 let name = if path.is_empty() {
118 "index".to_string()
119 } else {
120 path.split('/').last().unwrap_or(&path).to_string()
121 };
122
123 let content_data = bytes.to_vec();
124 let size = content_data.len() as u64;
125
126 results.push(FileMetadata {
127 name,
128 path: if path.is_empty() {
129 "/".to_string()
130 } else {
131 format!("/{}", path)
132 },
133 size,
134 last_modified: None,
135 etag: None,
136 is_dir: false,
137 content_type: Some("application/octet-stream".to_string()),
138 content: if config.fetch_content {
139 Some(content_data)
140 } else {
141 None
142 },
143 });
144 }
145 }
146 }
147
148 Ok(results)
149 }
150
151 fn backend_name(&self) -> &'static str {
152 "http"
153 }
154}
155
156pub fn register(
179 conn: &rusqlite::Connection,
180 module_name: &str,
181 endpoint: impl Into<String>,
182) -> rusqlite::Result<()> {
183 use crate::types::{columns, QueryConfig};
184 use rusqlite::{
185 ffi,
186 vtab::{self, eponymous_only_module, IndexInfo, VTab, VTabCursor, VTabKind},
187 };
188 use std::os::raw::c_int;
189
190 let endpoint_str = endpoint.into();
191
192 #[repr(C)]
194 struct HttpTable {
195 base: ffi::sqlite3_vtab,
196 endpoint: String,
197 }
198
199 #[repr(C)]
201 struct HttpCursor {
202 base: ffi::sqlite3_vtab_cursor,
203 files: Vec<crate::types::FileMetadata>,
204 current_row: usize,
205 endpoint: String,
206 }
207
208 impl HttpCursor {
209 fn new(endpoint: String) -> Self {
210 Self {
211 base: ffi::sqlite3_vtab_cursor::default(),
212 files: Vec::new(),
213 current_row: 0,
214 endpoint,
215 }
216 }
217 }
218
219 unsafe impl VTabCursor for HttpCursor {
220 fn filter(
221 &mut self,
222 _idx_num: c_int,
223 _idx_str: Option<&str>,
224 _args: &vtab::Values<'_>,
225 ) -> rusqlite::Result<()> {
226 let backend = HttpBackend::new(&self.endpoint);
228 let config = QueryConfig::default();
229
230 let files = tokio::task::block_in_place(|| {
232 tokio::runtime::Handle::current().block_on(async {
233 backend.list_files(&config).await
234 })
235 })
236 .map_err(|e| rusqlite::Error::ModuleError(e.to_string()))?;
237
238 self.files = files;
239 self.current_row = 0;
240 Ok(())
241 }
242
243 fn next(&mut self) -> rusqlite::Result<()> {
244 self.current_row += 1;
245 Ok(())
246 }
247
248 fn eof(&self) -> bool {
249 self.current_row >= self.files.len()
250 }
251
252 fn column(&self, ctx: &mut vtab::Context, col_index: c_int) -> rusqlite::Result<()> {
253 if self.current_row >= self.files.len() {
254 return Ok(());
255 }
256
257 let file = &self.files[self.current_row];
258
259 match col_index {
260 columns::PATH => ctx.set_result(&file.path),
261 columns::SIZE => ctx.set_result(&(file.size as i64)),
262 columns::LAST_MODIFIED => ctx.set_result(&file.last_modified),
263 columns::ETAG => ctx.set_result(&file.etag),
264 columns::IS_DIR => ctx.set_result(&file.is_dir),
265 columns::CONTENT_TYPE => ctx.set_result(&file.content_type),
266 columns::NAME => ctx.set_result(&file.name),
267 columns::CONTENT => {
268 if let Some(ref content) = file.content {
269 ctx.set_result(&content.as_slice())
270 } else {
271 ctx.set_result::<Option<&[u8]>>(&None)
272 }
273 }
274 _ => Ok(()),
275 }
276 }
277
278 fn rowid(&self) -> rusqlite::Result<i64> {
279 Ok(self.current_row as i64)
280 }
281 }
282
283 impl vtab::CreateVTab<'_> for HttpTable {
284 const KIND: VTabKind = VTabKind::EponymousOnly;
285 }
286
287 unsafe impl VTab<'_> for HttpTable {
288 type Aux = String;
289 type Cursor = HttpCursor;
290
291 fn connect(
292 _db: &mut vtab::VTabConnection,
293 aux: Option<&Self::Aux>,
294 _args: &[&[u8]],
295 ) -> rusqlite::Result<(String, Self)> {
296 let schema = "
297 CREATE TABLE x(
298 path TEXT,
299 size INTEGER,
300 last_modified TEXT,
301 etag TEXT,
302 is_dir INTEGER,
303 content_type TEXT,
304 name TEXT,
305 content BLOB
306 )
307 ";
308
309 let endpoint = if let Some(ep) = aux {
310 ep.clone()
311 } else {
312 "".to_string()
313 };
314
315 Ok((
316 schema.to_owned(),
317 HttpTable {
318 base: ffi::sqlite3_vtab::default(),
319 endpoint,
320 },
321 ))
322 }
323
324 fn best_index(&self, info: &mut IndexInfo) -> rusqlite::Result<()> {
325 info.set_estimated_cost(100.0);
326 Ok(())
327 }
328
329 fn open(&mut self) -> rusqlite::Result<Self::Cursor> {
330 Ok(HttpCursor::new(self.endpoint.clone()))
331 }
332 }
333
334 conn.create_module(
335 module_name,
336 eponymous_only_module::<HttpTable>(),
337 Some(endpoint_str),
338 )
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344
345 #[test]
346 fn test_backend_creation() {
347 let backend = HttpBackend::new("https://api.example.com/data");
348 assert_eq!(backend.endpoint, "https://api.example.com/data");
349 assert_eq!(backend.backend_name(), "http");
350 }
351
352 #[test]
353 fn test_backend_with_different_endpoints() {
354 let backend = HttpBackend::new("http://localhost:8080");
355 assert_eq!(backend.endpoint, "http://localhost:8080");
356 }
357
358 }