sqlite_vtable_opendal/backends/
http.rs1use crate::backends::StorageBackend;
7use crate::error::Result;
8use crate::types::{FileMetadata, QueryConfig};
9use async_trait::async_trait;
10use opendal::{services::Http, Operator};
11
12pub struct HttpBackend {
28 endpoint: String,
30}
31
32impl HttpBackend {
33 pub fn new(endpoint: impl Into<String>) -> Self {
47 Self {
48 endpoint: endpoint.into(),
49 }
50 }
51
52 fn create_operator(&self) -> Result<Operator> {
54 let builder = Http::default().endpoint(&self.endpoint);
55
56 Ok(Operator::new(builder)?
57 .finish())
58 }
59}
60
61#[async_trait]
62impl StorageBackend for HttpBackend {
63 async fn list_files(&self, config: &QueryConfig) -> Result<Vec<FileMetadata>> {
64 let operator = self.create_operator()?;
65 let mut results = Vec::new();
66
67 let normalized_path = if config.root_path.is_empty() || config.root_path == "/" {
69 "".to_string()
70 } else {
71 config.root_path.trim_matches('/').to_string()
72 };
73
74 let path = if normalized_path.is_empty() {
77 "".to_string()
78 } else {
79 normalized_path.clone()
80 };
81
82 match operator.stat(&path).await {
83 Ok(metadata) => {
84 let content = if config.fetch_content {
86 operator.read(&path).await.ok().map(|bytes| bytes.to_vec())
87 } else {
88 None
89 };
90
91 let name = if path.is_empty() {
93 "index".to_string()
94 } else {
95 path.split('/').next_back().unwrap_or(&path).to_string()
96 };
97
98 results.push(FileMetadata {
99 name,
100 path: if path.is_empty() {
101 "/".to_string()
102 } else {
103 format!("/{}", path)
104 },
105 size: metadata.content_length(),
106 last_modified: metadata.last_modified().map(|dt| dt.to_string()),
107 etag: metadata.etag().map(|e| e.to_string()),
108 is_dir: false,
109 content_type: metadata.content_type().map(|ct| ct.to_string()),
110 content,
111 });
112 }
113 Err(_) => {
114 if let Ok(bytes) = operator.read(&path).await {
116 let name = if path.is_empty() {
117 "index".to_string()
118 } else {
119 path.split('/').next_back().unwrap_or(&path).to_string()
120 };
121
122 let content_data = bytes.to_vec();
123 let size = content_data.len() as u64;
124
125 results.push(FileMetadata {
126 name,
127 path: if path.is_empty() {
128 "/".to_string()
129 } else {
130 format!("/{}", path)
131 },
132 size,
133 last_modified: None,
134 etag: None,
135 is_dir: false,
136 content_type: Some("application/octet-stream".to_string()),
137 content: if config.fetch_content {
138 Some(content_data)
139 } else {
140 None
141 },
142 });
143 }
144 }
145 }
146
147 Ok(results)
148 }
149
150 fn backend_name(&self) -> &'static str {
151 "http"
152 }
153}
154
155pub fn register(
178 conn: &rusqlite::Connection,
179 module_name: &str,
180 endpoint: impl Into<String>,
181) -> rusqlite::Result<()> {
182 use crate::types::{columns, QueryConfig};
183 use rusqlite::{
184 ffi,
185 vtab::{self, eponymous_only_module, IndexInfo, VTab, VTabCursor, VTabKind},
186 };
187 use std::os::raw::c_int;
188
189 let endpoint_str = endpoint.into();
190
191 #[repr(C)]
193 struct HttpTable {
194 base: ffi::sqlite3_vtab,
195 endpoint: String,
196 }
197
198 #[repr(C)]
200 struct HttpCursor {
201 base: ffi::sqlite3_vtab_cursor,
202 files: Vec<crate::types::FileMetadata>,
203 current_row: usize,
204 endpoint: String,
205 }
206
207 impl HttpCursor {
208 fn new(endpoint: String) -> Self {
209 Self {
210 base: ffi::sqlite3_vtab_cursor::default(),
211 files: Vec::new(),
212 current_row: 0,
213 endpoint,
214 }
215 }
216 }
217
218 unsafe impl VTabCursor for HttpCursor {
219 fn filter(
220 &mut self,
221 _idx_num: c_int,
222 _idx_str: Option<&str>,
223 _args: &vtab::Values<'_>,
224 ) -> rusqlite::Result<()> {
225 let backend = HttpBackend::new(&self.endpoint);
227 let config = QueryConfig::default();
228
229 let files = tokio::task::block_in_place(|| {
231 tokio::runtime::Handle::current().block_on(async {
232 backend.list_files(&config).await
233 })
234 })
235 .map_err(|e| rusqlite::Error::ModuleError(e.to_string()))?;
236
237 self.files = files;
238 self.current_row = 0;
239 Ok(())
240 }
241
242 fn next(&mut self) -> rusqlite::Result<()> {
243 self.current_row += 1;
244 Ok(())
245 }
246
247 fn eof(&self) -> bool {
248 self.current_row >= self.files.len()
249 }
250
251 fn column(&self, ctx: &mut vtab::Context, col_index: c_int) -> rusqlite::Result<()> {
252 if self.current_row >= self.files.len() {
253 return Ok(());
254 }
255
256 let file = &self.files[self.current_row];
257
258 match col_index {
259 columns::PATH => ctx.set_result(&file.path),
260 columns::SIZE => ctx.set_result(&(file.size as i64)),
261 columns::LAST_MODIFIED => ctx.set_result(&file.last_modified),
262 columns::ETAG => ctx.set_result(&file.etag),
263 columns::IS_DIR => ctx.set_result(&file.is_dir),
264 columns::CONTENT_TYPE => ctx.set_result(&file.content_type),
265 columns::NAME => ctx.set_result(&file.name),
266 columns::CONTENT => {
267 if let Some(ref content) = file.content {
268 ctx.set_result(&content.as_slice())
269 } else {
270 ctx.set_result::<Option<&[u8]>>(&None)
271 }
272 }
273 _ => Ok(()),
274 }
275 }
276
277 fn rowid(&self) -> rusqlite::Result<i64> {
278 Ok(self.current_row as i64)
279 }
280 }
281
282 impl vtab::CreateVTab<'_> for HttpTable {
283 const KIND: VTabKind = VTabKind::EponymousOnly;
284 }
285
286 unsafe impl VTab<'_> for HttpTable {
287 type Aux = String;
288 type Cursor = HttpCursor;
289
290 fn connect(
291 _db: &mut vtab::VTabConnection,
292 aux: Option<&Self::Aux>,
293 _args: &[&[u8]],
294 ) -> rusqlite::Result<(String, Self)> {
295 let schema = "
296 CREATE TABLE x(
297 path TEXT,
298 size INTEGER,
299 last_modified TEXT,
300 etag TEXT,
301 is_dir INTEGER,
302 content_type TEXT,
303 name TEXT,
304 content BLOB
305 )
306 ";
307
308 let endpoint = if let Some(ep) = aux {
309 ep.clone()
310 } else {
311 "".to_string()
312 };
313
314 Ok((
315 schema.to_owned(),
316 HttpTable {
317 base: ffi::sqlite3_vtab::default(),
318 endpoint,
319 },
320 ))
321 }
322
323 fn best_index(&self, info: &mut IndexInfo) -> rusqlite::Result<()> {
324 info.set_estimated_cost(100.0);
325 Ok(())
326 }
327
328 fn open(&mut self) -> rusqlite::Result<Self::Cursor> {
329 Ok(HttpCursor::new(self.endpoint.clone()))
330 }
331 }
332
333 conn.create_module(
334 module_name,
335 eponymous_only_module::<HttpTable>(),
336 Some(endpoint_str),
337 )
338}
339
340#[cfg(test)]
341mod tests {
342 use super::*;
343
344 #[test]
345 fn test_backend_creation() {
346 let backend = HttpBackend::new("https://api.example.com/data");
347 assert_eq!(backend.endpoint, "https://api.example.com/data");
348 assert_eq!(backend.backend_name(), "http");
349 }
350
351 #[test]
352 fn test_backend_with_different_endpoints() {
353 let backend = HttpBackend::new("http://localhost:8080");
354 assert_eq!(backend.endpoint, "http://localhost:8080");
355 }
356
357 }