Skip to main content

stac_server/backend/
duckdb.rs

1use super::Backend;
2use crate::{Error, Result};
3use bb8::{ManageConnection, Pool};
4use futures_core::Stream;
5use stac::Collection;
6use stac::api::{
7    CollectionsClient, ItemsClient, Search, StreamItemsClient, TransactionClient, stream_pages,
8};
9use stac_duckdb::Client;
10
11/// A backend that uses [DuckDB](https://duckdb.org/) to query
12/// [stac-geoparquet](https://github.com/stac-utils/stac-geoparquet).
13#[derive(Clone, Debug)]
14pub struct DuckdbBackend {
15    pool: Pool<DuckdbConnectionManager>,
16}
17
18struct DuckdbConnectionManager {
19    href: String,
20}
21
22struct DuckdbConnection {
23    client: Client,
24    href: String,
25}
26
27impl DuckdbBackend {
28    /// Creates a new DuckDB backend pointing to a single **stac-geoparquet** file.
29    ///
30    /// # Examples
31    ///
32    /// ```
33    /// use stac_server::DuckdbBackend;
34    /// # tokio_test::block_on(async {
35    /// let backend = DuckdbBackend::new("data/100-sentinel-2-items.parquet").await.unwrap();
36    /// # })
37    /// ```
38    pub async fn new(href: impl ToString) -> Result<DuckdbBackend> {
39        let pool = Pool::builder()
40            .build(DuckdbConnectionManager {
41                href: href.to_string(),
42            })
43            .await?;
44        Ok(DuckdbBackend { pool })
45    }
46}
47
48impl ItemsClient for DuckdbBackend {
49    type Error = Error;
50
51    async fn search(&self, search: Search) -> Result<stac::api::ItemCollection> {
52        let client = self.pool.get().await.map_err(Box::new)?;
53        client.search(search)
54    }
55}
56
57impl CollectionsClient for DuckdbBackend {
58    type Error = Error;
59
60    async fn collections(&self) -> Result<Vec<Collection>> {
61        let client = self.pool.get().await.map_err(Box::new)?;
62        client.collections()
63    }
64
65    async fn collection(&self, id: &str) -> Result<Option<Collection>> {
66        let client = self.pool.get().await.map_err(Box::new)?;
67        client.collection(id)
68    }
69}
70
71impl TransactionClient for DuckdbBackend {
72    type Error = Error;
73
74    async fn add_collection(&mut self, _collection: Collection) -> Result<()> {
75        Err(Error::ReadOnly)
76    }
77
78    async fn add_item(&mut self, _item: stac::Item) -> Result<()> {
79        Err(Error::ReadOnly)
80    }
81}
82
83impl StreamItemsClient for DuckdbBackend {
84    type Error = Error;
85
86    async fn search_stream(
87        &self,
88        search: Search,
89    ) -> Result<impl Stream<Item = std::result::Result<stac::api::Item, Error>> + Send> {
90        let page = ItemsClient::search(self, search.clone()).await?;
91        Ok(stream_pages(self.clone(), search, page))
92    }
93}
94
95impl Backend for DuckdbBackend {
96    fn has_item_search(&self) -> bool {
97        true
98    }
99
100    fn has_filter(&self) -> bool {
101        false
102    }
103}
104
105impl ManageConnection for DuckdbConnectionManager {
106    type Connection = DuckdbConnection;
107    type Error = Error;
108
109    async fn connect(&self) -> Result<DuckdbConnection> {
110        DuckdbConnection::new(&self.href)
111    }
112
113    async fn is_valid(&self, _conn: &mut DuckdbConnection) -> Result<()> {
114        Ok(())
115    }
116
117    fn has_broken(&self, _conn: &mut DuckdbConnection) -> bool {
118        false
119    }
120}
121
122impl DuckdbConnection {
123    fn new(href: impl ToString) -> Result<DuckdbConnection> {
124        let client = Client::new()?;
125        Ok(DuckdbConnection {
126            client,
127            href: href.to_string(),
128        })
129    }
130
131    fn collections(&self) -> Result<Vec<Collection>> {
132        let collections = self.client.collections(&self.href)?;
133        Ok(collections)
134    }
135
136    fn collection(&self, id: &str) -> Result<Option<Collection>> {
137        let collections = self.client.collections(&self.href)?;
138        Ok(collections
139            .into_iter()
140            .find(|collection| collection.id == id))
141    }
142
143    fn search(&self, search: Search) -> Result<stac::api::ItemCollection> {
144        let item_collection = self.client.search(&self.href, search)?;
145        Ok(item_collection)
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use stac::api::CollectionsClient;
152
153    #[tokio::test]
154    async fn backend() {
155        let backend = super::DuckdbBackend::new("data/100-sentinel-2-items.parquet")
156            .await
157            .unwrap();
158        assert!(
159            backend
160                .collection("sentinel-2-l2a")
161                .await
162                .unwrap()
163                .is_some()
164        );
165    }
166}