stac_server/backend/
pgstac.rs1use crate::{Backend, Error, Result};
2use bb8::Pool;
3use bb8_postgres::PostgresConnectionManager;
4use pgstac::Pgstac;
5use rustls::{ClientConfig, RootCertStore};
6use serde_json::Map;
7use stac::api::{CollectionSearchClient, ItemCollection, Search, SearchClient, TransactionClient};
8use stac::{Collection, Item};
9use tokio_postgres::{
10 Socket,
11 tls::{MakeTlsConnect, TlsConnect},
12};
13use tokio_postgres_rustls::MakeRustlsConnect;
14
15#[derive(Clone, Debug)]
17pub struct PgstacBackend<Tls>
18where
19 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
20 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
21 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
22 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
23{
24 pool: Pool<PostgresConnectionManager<Tls>>,
25}
26
27impl PgstacBackend<MakeRustlsConnect> {
28 pub async fn new_from_stringlike(
42 params: impl ToString,
43 ) -> Result<PgstacBackend<MakeRustlsConnect>> {
44 let _ = rustls::crypto::aws_lc_rs::default_provider()
45 .install_default()
46 .expect("The default provider should install without problems");
47 let config = ClientConfig::builder()
48 .with_root_certificates(RootCertStore::empty())
49 .with_no_client_auth();
50 let tls = MakeRustlsConnect::new(config);
51 PgstacBackend::new_from_stringlike_and_tls(params, tls).await
52 }
53}
54
55impl<Tls> PgstacBackend<Tls>
56where
57 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
58 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
59 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
60 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
61{
62 pub async fn new_from_stringlike_and_tls(
64 params: impl ToString,
65 tls: Tls,
66 ) -> Result<PgstacBackend<Tls>> {
67 let params = params.to_string();
68 let connection_manager = PostgresConnectionManager::new_from_stringlike(params, tls)?;
69 let pool = Pool::builder().build(connection_manager).await?;
70 Ok(PgstacBackend { pool })
71 }
72}
73
74impl<Tls> SearchClient for PgstacBackend<Tls>
75where
76 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
77 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
78 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
79 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
80{
81 type Error = Error;
82
83 async fn search(&self, search: Search) -> Result<ItemCollection> {
84 let client = self.pool.get().await?;
85 let page = client.search(search).await?;
86 let next_token = page.next_token();
87 let prev_token = page.prev_token();
88 let mut item_collection = ItemCollection::new(page.features)?;
89 if let Some(next_token) = next_token {
90 let mut next = Map::new();
91 let _ = next.insert("token".into(), next_token.into());
92 item_collection.next = Some(next);
93 }
94 if let Some(prev_token) = prev_token {
95 let mut prev = Map::new();
96 let _ = prev.insert("token".into(), prev_token.into());
97 item_collection.prev = Some(prev);
98 }
99 item_collection.context = page.context;
100 Ok(item_collection)
101 }
102
103 async fn item(&self, collection_id: &str, item_id: &str) -> Result<Option<Item>> {
104 let client = self.pool.get().await?;
105 let value = client.item(item_id, Some(collection_id)).await?;
106 value
107 .map(serde_json::from_value)
108 .transpose()
109 .map_err(Error::from)
110 }
111}
112
113impl<Tls> CollectionSearchClient for PgstacBackend<Tls>
114where
115 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
116 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
117 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
118 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
119{
120 type Error = Error;
121
122 async fn collections(&self) -> Result<Vec<Collection>> {
123 let client = self.pool.get().await?;
124 let values = client.collections().await?;
125 values
126 .into_iter()
127 .map(|v| serde_json::from_value(v).map_err(Error::from))
128 .collect()
129 }
130
131 async fn collection(&self, id: &str) -> Result<Option<Collection>> {
132 let client = self.pool.get().await?;
133 let value = client.collection(id).await?;
134 value
135 .map(serde_json::from_value)
136 .transpose()
137 .map_err(Error::from)
138 }
139}
140
141impl<Tls> TransactionClient for PgstacBackend<Tls>
142where
143 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
144 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
145 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
146 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
147{
148 type Error = Error;
149
150 async fn add_collection(&mut self, collection: Collection) -> Result<()> {
151 let client = self.pool.get().await?;
152 client.add_collection(collection).await.map_err(Error::from)
153 }
154
155 async fn add_item(&mut self, item: Item) -> Result<()> {
156 let client = self.pool.get().await?;
157 client.add_item(item).await.map_err(Error::from)
158 }
159
160 async fn add_items(&mut self, items: Vec<Item>) -> Result<()> {
161 tracing::debug!("adding {} items using pgstac loading", items.len());
162 let client = self.pool.get().await?;
163 client.add_items(&items).await.map_err(Error::from)
164 }
165}
166
167impl<Tls> Backend for PgstacBackend<Tls>
168where
169 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
170 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
171 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
172 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
173{
174 fn has_item_search(&self) -> bool {
175 true
176 }
177
178 fn has_filter(&self) -> bool {
179 true
180 }
181}