stac_server/backend/
duckdb.rs1use super::Backend;
2use crate::{Error, Result};
3use bb8::{ManageConnection, Pool};
4use stac::Collection;
5use stac_api::Search;
6use stac_duckdb::Client;
7
8#[derive(Clone, Debug)]
11pub struct DuckdbBackend {
12 pool: Pool<DuckdbConnectionManager>,
13}
14
15struct DuckdbConnectionManager {
16 href: String,
17}
18
19struct DuckdbConnection {
20 client: Client,
21 href: String,
22}
23
24impl DuckdbBackend {
25 pub async fn new(href: impl ToString) -> Result<DuckdbBackend> {
36 let pool = Pool::builder()
37 .build(DuckdbConnectionManager {
38 href: href.to_string(),
39 })
40 .await?;
41 Ok(DuckdbBackend { pool })
42 }
43}
44
45impl Backend for DuckdbBackend {
46 fn has_item_search(&self) -> bool {
47 true
48 }
49
50 fn has_filter(&self) -> bool {
51 false
52 }
53
54 async fn collections(&self) -> Result<Vec<Collection>> {
55 let client = self.pool.get().await.map_err(Box::new)?;
56 client.collections()
57 }
58
59 async fn collection(&self, id: &str) -> Result<Option<Collection>> {
60 let client = self.pool.get().await.map_err(Box::new)?;
61 client.collection(id)
62 }
63
64 async fn add_collection(&mut self, _collection: Collection) -> Result<()> {
65 Err(Error::ReadOnly)
66 }
67
68 async fn add_item(&mut self, _item: stac::Item) -> Result<()> {
69 Err(Error::ReadOnly)
70 }
71
72 async fn add_items(&mut self, _items: Vec<stac::Item>) -> Result<()> {
73 Err(Error::ReadOnly)
74 }
75
76 async fn item(&self, collection_id: &str, item_id: &str) -> Result<Option<stac::Item>> {
77 let mut item_collection = self
78 .search(Search {
79 ids: vec![item_id.to_string()],
80 collections: vec![collection_id.to_string()],
81 ..Default::default()
82 })
83 .await?;
84 if item_collection.items.len() == 1 {
85 Ok(Some(serde_json::from_value(serde_json::Value::Object(
86 item_collection.items.pop().unwrap(),
87 ))?))
88 } else {
89 Ok(None)
90 }
91 }
92
93 async fn items(
94 &self,
95 collection_id: &str,
96 items: stac_api::Items,
97 ) -> Result<Option<stac_api::ItemCollection>> {
98 let item_collection = self
99 .search(Search {
100 items,
101 collections: vec![collection_id.to_string()],
102 ..Default::default()
103 })
104 .await?;
105 Ok(Some(item_collection))
107 }
108
109 async fn search(&self, search: Search) -> Result<stac_api::ItemCollection> {
110 let client = self.pool.get().await.map_err(Box::new)?;
111 client.search(search)
112 }
113}
114
115impl ManageConnection for DuckdbConnectionManager {
116 type Connection = DuckdbConnection;
117 type Error = Error;
118
119 async fn connect(&self) -> Result<DuckdbConnection> {
120 DuckdbConnection::new(&self.href)
121 }
122
123 async fn is_valid(&self, _conn: &mut DuckdbConnection) -> Result<()> {
124 Ok(())
125 }
126
127 fn has_broken(&self, _conn: &mut DuckdbConnection) -> bool {
128 false
129 }
130}
131
132impl DuckdbConnection {
133 fn new(href: impl ToString) -> Result<DuckdbConnection> {
134 let client = Client::new()?;
135 Ok(DuckdbConnection {
136 client,
137 href: href.to_string(),
138 })
139 }
140
141 fn collections(&self) -> Result<Vec<Collection>> {
142 let collections = self.client.collections(&self.href)?;
143 Ok(collections)
144 }
145
146 fn collection(&self, id: &str) -> Result<Option<Collection>> {
147 let collections = self.client.collections(&self.href)?;
148 Ok(collections
149 .into_iter()
150 .find(|collection| collection.id == id))
151 }
152
153 fn search(&self, search: Search) -> Result<stac_api::ItemCollection> {
154 let item_collection = self.client.search(&self.href, search)?;
155 Ok(item_collection)
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use crate::Backend;
162
163 #[tokio::test]
164 async fn backend() {
165 let backend = super::DuckdbBackend::new("data/100-sentinel-2-items.parquet")
166 .await
167 .unwrap();
168 assert!(
169 backend
170 .collection("sentinel-2-l2a")
171 .await
172 .unwrap()
173 .is_some()
174 );
175 }
176}