stac_server/backend/
duckdb.rs

1use super::Backend;
2use crate::{Error, Result};
3use bb8::{ManageConnection, Pool};
4use stac::Collection;
5use stac_api::Search;
6use stac_duckdb::Client;
7
8/// A backend that uses [DuckDB](https://duckdb.org/) to query
9/// [stac-geoparquet](https://github.com/stac-utils/stac-geoparquet).
10#[derive(Clone, Debug)]
11pub struct DuckdbBackend {
12    pool: Pool<DuckdbConnectionManager>,
13}
14
15struct DuckdbConnectionManager {
16    href: String,
17}
18
19struct DuckdbConnection {
20    client: Client,
21    href: String,
22}
23
24impl DuckdbBackend {
25    /// Creates a new DuckDB backend pointing to a single **stac-geoparquet** file.
26    ///
27    /// # Examples
28    ///
29    /// ```
30    /// use stac_server::DuckdbBackend;
31    /// # tokio_test::block_on(async {
32    /// let backend = DuckdbBackend::new("data/100-sentinel-2-items.parquet").await.unwrap();
33    /// # })
34    /// ```
35    pub async fn new(href: impl ToString) -> Result<DuckdbBackend> {
36        let pool = Pool::builder()
37            .build(DuckdbConnectionManager {
38                href: href.to_string(),
39            })
40            .await?;
41        Ok(DuckdbBackend { pool })
42    }
43}
44
45impl Backend for DuckdbBackend {
46    fn has_item_search(&self) -> bool {
47        true
48    }
49
50    fn has_filter(&self) -> bool {
51        false
52    }
53
54    async fn collections(&self) -> Result<Vec<Collection>> {
55        let client = self.pool.get().await.map_err(Box::new)?;
56        client.collections()
57    }
58
59    async fn collection(&self, id: &str) -> Result<Option<Collection>> {
60        let client = self.pool.get().await.map_err(Box::new)?;
61        client.collection(id)
62    }
63
64    async fn add_collection(&mut self, _collection: Collection) -> Result<()> {
65        Err(Error::ReadOnly)
66    }
67
68    async fn add_item(&mut self, _item: stac::Item) -> Result<()> {
69        Err(Error::ReadOnly)
70    }
71
72    async fn add_items(&mut self, _items: Vec<stac::Item>) -> Result<()> {
73        Err(Error::ReadOnly)
74    }
75
76    async fn item(&self, collection_id: &str, item_id: &str) -> Result<Option<stac::Item>> {
77        let mut item_collection = self
78            .search(Search {
79                ids: vec![item_id.to_string()],
80                collections: vec![collection_id.to_string()],
81                ..Default::default()
82            })
83            .await?;
84        if item_collection.items.len() == 1 {
85            Ok(Some(serde_json::from_value(serde_json::Value::Object(
86                item_collection.items.pop().unwrap(),
87            ))?))
88        } else {
89            Ok(None)
90        }
91    }
92
93    async fn items(
94        &self,
95        collection_id: &str,
96        items: stac_api::Items,
97    ) -> Result<Option<stac_api::ItemCollection>> {
98        let item_collection = self
99            .search(Search {
100                items,
101                collections: vec![collection_id.to_string()],
102                ..Default::default()
103            })
104            .await?;
105        // TODO maybe return None if there's no collection?
106        Ok(Some(item_collection))
107    }
108
109    async fn search(&self, search: Search) -> Result<stac_api::ItemCollection> {
110        let client = self.pool.get().await.map_err(Box::new)?;
111        client.search(search)
112    }
113}
114
115impl ManageConnection for DuckdbConnectionManager {
116    type Connection = DuckdbConnection;
117    type Error = Error;
118
119    async fn connect(&self) -> Result<DuckdbConnection> {
120        DuckdbConnection::new(&self.href)
121    }
122
123    async fn is_valid(&self, _conn: &mut DuckdbConnection) -> Result<()> {
124        Ok(())
125    }
126
127    fn has_broken(&self, _conn: &mut DuckdbConnection) -> bool {
128        false
129    }
130}
131
132impl DuckdbConnection {
133    fn new(href: impl ToString) -> Result<DuckdbConnection> {
134        let client = Client::new()?;
135        Ok(DuckdbConnection {
136            client,
137            href: href.to_string(),
138        })
139    }
140
141    fn collections(&self) -> Result<Vec<Collection>> {
142        let collections = self.client.collections(&self.href)?;
143        Ok(collections)
144    }
145
146    fn collection(&self, id: &str) -> Result<Option<Collection>> {
147        let collections = self.client.collections(&self.href)?;
148        Ok(collections
149            .into_iter()
150            .find(|collection| collection.id == id))
151    }
152
153    fn search(&self, search: Search) -> Result<stac_api::ItemCollection> {
154        let item_collection = self.client.search(&self.href, search)?;
155        Ok(item_collection)
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use crate::Backend;
162
163    #[tokio::test]
164    async fn backend() {
165        let backend = super::DuckdbBackend::new("data/100-sentinel-2-items.parquet")
166            .await
167            .unwrap();
168        assert!(
169            backend
170                .collection("sentinel-2-l2a")
171                .await
172                .unwrap()
173                .is_some()
174        );
175    }
176}