Skip to main content

sqlite_vtable_opendal/backends/
http.rs

1//! HTTP storage backend implementation
2//!
3//! This backend allows fetching files from HTTP/HTTPS endpoints.
4//! It treats HTTP resources as "files" that can be queried.
5
6use crate::backends::StorageBackend;
7use crate::error::{Result, VTableError};
8use crate::types::{FileMetadata, QueryConfig};
9use async_trait::async_trait;
10use opendal::{services::Http, Operator};
11
12/// HTTP storage backend
13///
14/// This backend uses OpenDAL's HTTP service to fetch files from HTTP endpoints.
15/// Each HTTP resource is treated as a file.
16///
17/// # Example
18///
19/// ```rust,ignore
20/// use sqlite_vtable_opendal::backends::http::HttpBackend;
21/// use sqlite_vtable_opendal::types::QueryConfig;
22///
23/// let backend = HttpBackend::new("https://api.example.com");
24/// let config = QueryConfig::default();
25/// let files = backend.list_files(&config).await?;
26/// ```
27pub struct HttpBackend {
28    /// Base HTTP endpoint URL
29    endpoint: String,
30}
31
32impl HttpBackend {
33    /// Create a new HTTP backend
34    ///
35    /// # Arguments
36    ///
37    /// * `endpoint` - Base HTTP endpoint URL (e.g., "https://api.example.com")
38    ///
39    /// # Example
40    ///
41    /// ```
42    /// use sqlite_vtable_opendal::backends::http::HttpBackend;
43    ///
44    /// let backend = HttpBackend::new("https://api.example.com/data");
45    /// ```
46    pub fn new(endpoint: impl Into<String>) -> Self {
47        Self {
48            endpoint: endpoint.into(),
49        }
50    }
51
52    /// Create an OpenDAL operator for HTTP
53    fn create_operator(&self) -> Result<Operator> {
54        let builder = Http::default().endpoint(&self.endpoint);
55
56        Operator::new(builder)
57            .map(|op| op.finish())
58            .map_err(|e| VTableError::OpenDal(e))
59    }
60}
61
62#[async_trait]
63impl StorageBackend for HttpBackend {
64    async fn list_files(&self, config: &QueryConfig) -> Result<Vec<FileMetadata>> {
65        let operator = self.create_operator()?;
66        let mut results = Vec::new();
67
68        // Normalize the path
69        let normalized_path = if config.root_path.is_empty() || config.root_path == "/" {
70            "".to_string()
71        } else {
72            config.root_path.trim_matches('/').to_string()
73        };
74
75        // For HTTP, we typically fetch a single resource
76        // Try to get metadata first
77        let path = if normalized_path.is_empty() {
78            "".to_string()
79        } else {
80            normalized_path.clone()
81        };
82
83        match operator.stat(&path).await {
84            Ok(metadata) => {
85                // Optionally fetch content
86                let content = if config.fetch_content {
87                    operator.read(&path).await.ok().map(|bytes| bytes.to_vec())
88                } else {
89                    None
90                };
91
92                // Extract file name from path
93                let name = if path.is_empty() {
94                    "index".to_string()
95                } else {
96                    path.split('/').last().unwrap_or(&path).to_string()
97                };
98
99                results.push(FileMetadata {
100                    name,
101                    path: if path.is_empty() {
102                        "/".to_string()
103                    } else {
104                        format!("/{}", path)
105                    },
106                    size: metadata.content_length(),
107                    last_modified: metadata.last_modified().map(|dt| dt.to_string()),
108                    etag: metadata.etag().map(|e| e.to_string()),
109                    is_dir: false,
110                    content_type: metadata.content_type().map(|ct| ct.to_string()),
111                    content,
112                });
113            }
114            Err(_) => {
115                // If stat fails, try to read the content directly
116                if let Ok(bytes) = operator.read(&path).await {
117                    let name = if path.is_empty() {
118                        "index".to_string()
119                    } else {
120                        path.split('/').last().unwrap_or(&path).to_string()
121                    };
122
123                    let content_data = bytes.to_vec();
124                    let size = content_data.len() as u64;
125
126                    results.push(FileMetadata {
127                        name,
128                        path: if path.is_empty() {
129                            "/".to_string()
130                        } else {
131                            format!("/{}", path)
132                        },
133                        size,
134                        last_modified: None,
135                        etag: None,
136                        is_dir: false,
137                        content_type: Some("application/octet-stream".to_string()),
138                        content: if config.fetch_content {
139                            Some(content_data)
140                        } else {
141                            None
142                        },
143                    });
144                }
145            }
146        }
147
148        Ok(results)
149    }
150
151    fn backend_name(&self) -> &'static str {
152        "http"
153    }
154}
155
156/// Register the http virtual table with SQLite
157///
158/// This function creates a virtual table module that allows querying
159/// HTTP resources using SQL.
160///
161/// # Arguments
162///
163/// * `conn` - SQLite connection
164/// * `module_name` - Name for the virtual table (e.g., "http_data")
165/// * `endpoint` - Base HTTP endpoint URL
166///
167/// # Example
168///
169/// ```rust,ignore
170/// use rusqlite::Connection;
171/// use sqlite_vtable_opendal::backends::http;
172///
173/// let conn = Connection::open_in_memory()?;
174/// http::register(&conn, "http_data", "https://api.example.com")?;
175///
176/// // Now you can query: SELECT * FROM http_data
177/// ```
178pub fn register(
179    conn: &rusqlite::Connection,
180    module_name: &str,
181    endpoint: impl Into<String>,
182) -> rusqlite::Result<()> {
183    use crate::types::{columns, QueryConfig};
184    use rusqlite::{
185        ffi,
186        vtab::{self, eponymous_only_module, IndexInfo, VTab, VTabCursor, VTabKind},
187    };
188    use std::os::raw::c_int;
189
190    let endpoint_str = endpoint.into();
191
192    // Create a specific table type for HTTP
193    #[repr(C)]
194    struct HttpTable {
195        base: ffi::sqlite3_vtab,
196        endpoint: String,
197    }
198
199    // Create a specific cursor type for HTTP
200    #[repr(C)]
201    struct HttpCursor {
202        base: ffi::sqlite3_vtab_cursor,
203        files: Vec<crate::types::FileMetadata>,
204        current_row: usize,
205        endpoint: String,
206    }
207
208    impl HttpCursor {
209        fn new(endpoint: String) -> Self {
210            Self {
211                base: ffi::sqlite3_vtab_cursor::default(),
212                files: Vec::new(),
213                current_row: 0,
214                endpoint,
215            }
216        }
217    }
218
219    unsafe impl VTabCursor for HttpCursor {
220        fn filter(
221            &mut self,
222            _idx_num: c_int,
223            _idx_str: Option<&str>,
224            _args: &vtab::Values<'_>,
225        ) -> rusqlite::Result<()> {
226            // Create backend and fetch files
227            let backend = HttpBackend::new(&self.endpoint);
228            let config = QueryConfig::default();
229
230            // Fetch files from the backend (blocking the async call)
231            let files = tokio::task::block_in_place(|| {
232                tokio::runtime::Handle::current().block_on(async {
233                    backend.list_files(&config).await
234                })
235            })
236            .map_err(|e| rusqlite::Error::ModuleError(e.to_string()))?;
237
238            self.files = files;
239            self.current_row = 0;
240            Ok(())
241        }
242
243        fn next(&mut self) -> rusqlite::Result<()> {
244            self.current_row += 1;
245            Ok(())
246        }
247
248        fn eof(&self) -> bool {
249            self.current_row >= self.files.len()
250        }
251
252        fn column(&self, ctx: &mut vtab::Context, col_index: c_int) -> rusqlite::Result<()> {
253            if self.current_row >= self.files.len() {
254                return Ok(());
255            }
256
257            let file = &self.files[self.current_row];
258
259            match col_index {
260                columns::PATH => ctx.set_result(&file.path),
261                columns::SIZE => ctx.set_result(&(file.size as i64)),
262                columns::LAST_MODIFIED => ctx.set_result(&file.last_modified),
263                columns::ETAG => ctx.set_result(&file.etag),
264                columns::IS_DIR => ctx.set_result(&file.is_dir),
265                columns::CONTENT_TYPE => ctx.set_result(&file.content_type),
266                columns::NAME => ctx.set_result(&file.name),
267                columns::CONTENT => {
268                    if let Some(ref content) = file.content {
269                        ctx.set_result(&content.as_slice())
270                    } else {
271                        ctx.set_result::<Option<&[u8]>>(&None)
272                    }
273                }
274                _ => Ok(()),
275            }
276        }
277
278        fn rowid(&self) -> rusqlite::Result<i64> {
279            Ok(self.current_row as i64)
280        }
281    }
282
283    impl vtab::CreateVTab<'_> for HttpTable {
284        const KIND: VTabKind = VTabKind::EponymousOnly;
285    }
286
287    unsafe impl VTab<'_> for HttpTable {
288        type Aux = String;
289        type Cursor = HttpCursor;
290
291        fn connect(
292            _db: &mut vtab::VTabConnection,
293            aux: Option<&Self::Aux>,
294            _args: &[&[u8]],
295        ) -> rusqlite::Result<(String, Self)> {
296            let schema = "
297                CREATE TABLE x(
298                    path TEXT,
299                    size INTEGER,
300                    last_modified TEXT,
301                    etag TEXT,
302                    is_dir INTEGER,
303                    content_type TEXT,
304                    name TEXT,
305                    content BLOB
306                )
307            ";
308
309            let endpoint = if let Some(ep) = aux {
310                ep.clone()
311            } else {
312                "".to_string()
313            };
314
315            Ok((
316                schema.to_owned(),
317                HttpTable {
318                    base: ffi::sqlite3_vtab::default(),
319                    endpoint,
320                },
321            ))
322        }
323
324        fn best_index(&self, info: &mut IndexInfo) -> rusqlite::Result<()> {
325            info.set_estimated_cost(100.0);
326            Ok(())
327        }
328
329        fn open(&mut self) -> rusqlite::Result<Self::Cursor> {
330            Ok(HttpCursor::new(self.endpoint.clone()))
331        }
332    }
333
334    conn.create_module(
335        module_name,
336        eponymous_only_module::<HttpTable>(),
337        Some(endpoint_str),
338    )
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344
345    #[test]
346    fn test_backend_creation() {
347        let backend = HttpBackend::new("https://api.example.com/data");
348        assert_eq!(backend.endpoint, "https://api.example.com/data");
349        assert_eq!(backend.backend_name(), "http");
350    }
351
352    #[test]
353    fn test_backend_with_different_endpoints() {
354        let backend = HttpBackend::new("http://localhost:8080");
355        assert_eq!(backend.endpoint, "http://localhost:8080");
356    }
357
358    // Note: Integration tests with actual HTTP endpoints would require a test server
359    // and are better suited for manual testing or CI with mock servers
360}