stac_server/backend/
pgstac.rs1use crate::{Backend, Error, Result};
2use bb8::Pool;
3use bb8_postgres::PostgresConnectionManager;
4use futures_core::Stream;
5use pgstac::Pgstac;
6use rustls::{ClientConfig, RootCertStore};
7use serde_json::Map;
8use stac::api::{
9 CollectionsClient, ItemCollection, ItemsClient, Search, StreamItemsClient, TransactionClient,
10 stream_pages,
11};
12use stac::{Collection, Item};
13use tokio_postgres::{
14 Socket,
15 tls::{MakeTlsConnect, TlsConnect},
16};
17use tokio_postgres_rustls::MakeRustlsConnect;
18
19#[derive(Clone, Debug)]
21pub struct PgstacBackend<Tls>
22where
23 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
24 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
25 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
26 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
27{
28 pool: Pool<PostgresConnectionManager<Tls>>,
29}
30
31impl PgstacBackend<MakeRustlsConnect> {
32 pub async fn new_from_stringlike(
46 params: impl ToString,
47 ) -> Result<PgstacBackend<MakeRustlsConnect>> {
48 let _ = rustls::crypto::aws_lc_rs::default_provider()
49 .install_default()
50 .expect("The default provider should install without problems");
51 let config = ClientConfig::builder()
52 .with_root_certificates(RootCertStore::empty())
53 .with_no_client_auth();
54 let tls = MakeRustlsConnect::new(config);
55 PgstacBackend::new_from_stringlike_and_tls(params, tls).await
56 }
57}
58
59impl<Tls> PgstacBackend<Tls>
60where
61 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
62 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
63 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
64 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
65{
66 pub async fn new_from_stringlike_and_tls(
68 params: impl ToString,
69 tls: Tls,
70 ) -> Result<PgstacBackend<Tls>> {
71 let params = params.to_string();
72 let connection_manager = PostgresConnectionManager::new_from_stringlike(params, tls)?;
73 let pool = Pool::builder().build(connection_manager).await?;
74 Ok(PgstacBackend { pool })
75 }
76}
77
78impl<Tls> ItemsClient for PgstacBackend<Tls>
79where
80 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
81 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
82 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
83 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
84{
85 type Error = Error;
86
87 async fn search(&self, search: Search) -> Result<ItemCollection> {
88 let client = self.pool.get().await?;
89 let page = client.search(search).await?;
90 let next_token = page.next_token();
91 let prev_token = page.prev_token();
92 let mut item_collection = ItemCollection::new(page.features)?;
93 if let Some(next_token) = next_token {
94 let mut next = Map::new();
95 let _ = next.insert("token".into(), next_token.into());
96 item_collection.next = Some(next);
97 }
98 if let Some(prev_token) = prev_token {
99 let mut prev = Map::new();
100 let _ = prev.insert("token".into(), prev_token.into());
101 item_collection.prev = Some(prev);
102 }
103 item_collection.context = page.context;
104 Ok(item_collection)
105 }
106
107 async fn item(&self, collection_id: &str, item_id: &str) -> Result<Option<Item>> {
108 let client = self.pool.get().await?;
109 let value = client.item(item_id, Some(collection_id)).await?;
110 value
111 .map(serde_json::from_value)
112 .transpose()
113 .map_err(Error::from)
114 }
115}
116
117impl<Tls> CollectionsClient for PgstacBackend<Tls>
118where
119 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
120 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
121 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
122 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
123{
124 type Error = Error;
125
126 async fn collections(&self) -> Result<Vec<Collection>> {
127 let client = self.pool.get().await?;
128 let values = client.collections().await?;
129 values
130 .into_iter()
131 .map(|v| serde_json::from_value(v).map_err(Error::from))
132 .collect()
133 }
134
135 async fn collection(&self, id: &str) -> Result<Option<Collection>> {
136 let client = self.pool.get().await?;
137 let value = client.collection(id).await?;
138 value
139 .map(serde_json::from_value)
140 .transpose()
141 .map_err(Error::from)
142 }
143}
144
145impl<Tls> TransactionClient for PgstacBackend<Tls>
146where
147 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
148 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
149 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
150 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
151{
152 type Error = Error;
153
154 async fn add_collection(&mut self, collection: Collection) -> Result<()> {
155 let client = self.pool.get().await?;
156 client.add_collection(collection).await.map_err(Error::from)
157 }
158
159 async fn add_item(&mut self, item: Item) -> Result<()> {
160 let client = self.pool.get().await?;
161 client.add_item(item).await.map_err(Error::from)
162 }
163
164 async fn add_items(&mut self, items: Vec<Item>) -> Result<()> {
165 tracing::debug!("adding {} items using pgstac loading", items.len());
166 let client = self.pool.get().await?;
167 client.add_items(&items).await.map_err(Error::from)
168 }
169}
170
171impl<Tls> StreamItemsClient for PgstacBackend<Tls>
172where
173 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
174 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
175 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
176 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
177{
178 type Error = Error;
179
180 async fn search_stream(
181 &self,
182 search: Search,
183 ) -> Result<impl Stream<Item = std::result::Result<stac::api::Item, Error>> + Send> {
184 let page = ItemsClient::search(self, search.clone()).await?;
185 Ok(stream_pages(self.clone(), search, page))
186 }
187}
188
189impl<Tls> Backend for PgstacBackend<Tls>
190where
191 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
192 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
193 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
194 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
195{
196 fn has_item_search(&self) -> bool {
197 true
198 }
199
200 fn has_filter(&self) -> bool {
201 true
202 }
203}