stac_server/backend/
duckdb.rs1use super::Backend;
2use crate::{Error, Result};
3use bb8::{ManageConnection, Pool};
4use futures_core::Stream;
5use stac::Collection;
6use stac::api::{
7 CollectionsClient, ItemsClient, Search, StreamItemsClient, TransactionClient, stream_pages,
8};
9use stac_duckdb::Client;
10
11#[derive(Clone, Debug)]
14pub struct DuckdbBackend {
15 pool: Pool<DuckdbConnectionManager>,
16}
17
18struct DuckdbConnectionManager {
19 href: String,
20}
21
22struct DuckdbConnection {
23 client: Client,
24 href: String,
25}
26
27impl DuckdbBackend {
28 pub async fn new(href: impl ToString) -> Result<DuckdbBackend> {
39 let pool = Pool::builder()
40 .build(DuckdbConnectionManager {
41 href: href.to_string(),
42 })
43 .await?;
44 Ok(DuckdbBackend { pool })
45 }
46}
47
48impl ItemsClient for DuckdbBackend {
49 type Error = Error;
50
51 async fn search(&self, search: Search) -> Result<stac::api::ItemCollection> {
52 let client = self.pool.get().await.map_err(Box::new)?;
53 client.search(search)
54 }
55}
56
57impl CollectionsClient for DuckdbBackend {
58 type Error = Error;
59
60 async fn collections(&self) -> Result<Vec<Collection>> {
61 let client = self.pool.get().await.map_err(Box::new)?;
62 client.collections()
63 }
64
65 async fn collection(&self, id: &str) -> Result<Option<Collection>> {
66 let client = self.pool.get().await.map_err(Box::new)?;
67 client.collection(id)
68 }
69}
70
71impl TransactionClient for DuckdbBackend {
72 type Error = Error;
73
74 async fn add_collection(&mut self, _collection: Collection) -> Result<()> {
75 Err(Error::ReadOnly)
76 }
77
78 async fn add_item(&mut self, _item: stac::Item) -> Result<()> {
79 Err(Error::ReadOnly)
80 }
81}
82
83impl StreamItemsClient for DuckdbBackend {
84 type Error = Error;
85
86 async fn search_stream(
87 &self,
88 search: Search,
89 ) -> Result<impl Stream<Item = std::result::Result<stac::api::Item, Error>> + Send> {
90 let page = ItemsClient::search(self, search.clone()).await?;
91 Ok(stream_pages(self.clone(), search, page))
92 }
93}
94
95impl Backend for DuckdbBackend {
96 fn has_item_search(&self) -> bool {
97 true
98 }
99
100 fn has_filter(&self) -> bool {
101 false
102 }
103}
104
105impl ManageConnection for DuckdbConnectionManager {
106 type Connection = DuckdbConnection;
107 type Error = Error;
108
109 async fn connect(&self) -> Result<DuckdbConnection> {
110 DuckdbConnection::new(&self.href)
111 }
112
113 async fn is_valid(&self, _conn: &mut DuckdbConnection) -> Result<()> {
114 Ok(())
115 }
116
117 fn has_broken(&self, _conn: &mut DuckdbConnection) -> bool {
118 false
119 }
120}
121
122impl DuckdbConnection {
123 fn new(href: impl ToString) -> Result<DuckdbConnection> {
124 let client = Client::new()?;
125 Ok(DuckdbConnection {
126 client,
127 href: href.to_string(),
128 })
129 }
130
131 fn collections(&self) -> Result<Vec<Collection>> {
132 let collections = self.client.collections(&self.href)?;
133 Ok(collections)
134 }
135
136 fn collection(&self, id: &str) -> Result<Option<Collection>> {
137 let collections = self.client.collections(&self.href)?;
138 Ok(collections
139 .into_iter()
140 .find(|collection| collection.id == id))
141 }
142
143 fn search(&self, search: Search) -> Result<stac::api::ItemCollection> {
144 let item_collection = self.client.search(&self.href, search)?;
145 Ok(item_collection)
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use stac::api::CollectionsClient;
152
153 #[tokio::test]
154 async fn backend() {
155 let backend = super::DuckdbBackend::new("data/100-sentinel-2-items.parquet")
156 .await
157 .unwrap();
158 assert!(
159 backend
160 .collection("sentinel-2-l2a")
161 .await
162 .unwrap()
163 .is_some()
164 );
165 }
166}