Skip to main content

stac_server/backend/
pgstac.rs

1use crate::{Backend, Error, Result};
2use bb8::Pool;
3use bb8_postgres::PostgresConnectionManager;
4use futures_core::Stream;
5use pgstac::Pgstac;
6use rustls::{ClientConfig, RootCertStore};
7use serde_json::Map;
8use stac::api::{
9    CollectionsClient, ItemCollection, ItemsClient, Search, StreamItemsClient, TransactionClient,
10    stream_pages,
11};
12use stac::{Collection, Item};
13use tokio_postgres::{
14    Socket,
15    tls::{MakeTlsConnect, TlsConnect},
16};
17use tokio_postgres_rustls::MakeRustlsConnect;
18
19/// A backend for a [pgstac](https://github.com/stac-utils/pgstac) database.
20#[derive(Clone, Debug)]
21pub struct PgstacBackend<Tls>
22where
23    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
24    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
25    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
26    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
27{
28    pool: Pool<PostgresConnectionManager<Tls>>,
29}
30
31impl PgstacBackend<MakeRustlsConnect> {
32    /// Creates a new PgstacBackend from a string-like configuration.
33    ///
34    /// This will use an unverified tls. To provide your own tls, use
35    /// [PgstacBackend::new_from_stringlike_and_tls].
36    ///
37    /// # Examples
38    ///
39    /// ```no_run
40    /// use stac_server::PgstacBackend;
41    /// # tokio_test::block_on(async {
42    /// let backend = PgstacBackend::new_from_stringlike("postgresql://username:password@localhost:5432/postgis").await.unwrap();
43    /// # })
44    /// ```
45    pub async fn new_from_stringlike(
46        params: impl ToString,
47    ) -> Result<PgstacBackend<MakeRustlsConnect>> {
48        let _ = rustls::crypto::aws_lc_rs::default_provider()
49            .install_default()
50            .expect("The default provider should install without problems");
51        let config = ClientConfig::builder()
52            .with_root_certificates(RootCertStore::empty())
53            .with_no_client_auth();
54        let tls = MakeRustlsConnect::new(config);
55        PgstacBackend::new_from_stringlike_and_tls(params, tls).await
56    }
57}
58
59impl<Tls> PgstacBackend<Tls>
60where
61    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
62    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
63    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
64    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
65{
66    /// Creates a new PgstacBackend from a string-like configuration and a tls.
67    pub async fn new_from_stringlike_and_tls(
68        params: impl ToString,
69        tls: Tls,
70    ) -> Result<PgstacBackend<Tls>> {
71        let params = params.to_string();
72        let connection_manager = PostgresConnectionManager::new_from_stringlike(params, tls)?;
73        let pool = Pool::builder().build(connection_manager).await?;
74        Ok(PgstacBackend { pool })
75    }
76}
77
78impl<Tls> ItemsClient for PgstacBackend<Tls>
79where
80    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
81    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
82    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
83    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
84{
85    type Error = Error;
86
87    async fn search(&self, search: Search) -> Result<ItemCollection> {
88        let client = self.pool.get().await?;
89        let page = client.search(search).await?;
90        let next_token = page.next_token();
91        let prev_token = page.prev_token();
92        let mut item_collection = ItemCollection::new(page.features)?;
93        if let Some(next_token) = next_token {
94            let mut next = Map::new();
95            let _ = next.insert("token".into(), next_token.into());
96            item_collection.next = Some(next);
97        }
98        if let Some(prev_token) = prev_token {
99            let mut prev = Map::new();
100            let _ = prev.insert("token".into(), prev_token.into());
101            item_collection.prev = Some(prev);
102        }
103        item_collection.context = page.context;
104        Ok(item_collection)
105    }
106
107    async fn item(&self, collection_id: &str, item_id: &str) -> Result<Option<Item>> {
108        let client = self.pool.get().await?;
109        let value = client.item(item_id, Some(collection_id)).await?;
110        value
111            .map(serde_json::from_value)
112            .transpose()
113            .map_err(Error::from)
114    }
115}
116
117impl<Tls> CollectionsClient for PgstacBackend<Tls>
118where
119    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
120    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
121    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
122    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
123{
124    type Error = Error;
125
126    async fn collections(&self) -> Result<Vec<Collection>> {
127        let client = self.pool.get().await?;
128        let values = client.collections().await?;
129        values
130            .into_iter()
131            .map(|v| serde_json::from_value(v).map_err(Error::from))
132            .collect()
133    }
134
135    async fn collection(&self, id: &str) -> Result<Option<Collection>> {
136        let client = self.pool.get().await?;
137        let value = client.collection(id).await?;
138        value
139            .map(serde_json::from_value)
140            .transpose()
141            .map_err(Error::from)
142    }
143}
144
145impl<Tls> TransactionClient for PgstacBackend<Tls>
146where
147    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
148    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
149    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
150    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
151{
152    type Error = Error;
153
154    async fn add_collection(&mut self, collection: Collection) -> Result<()> {
155        let client = self.pool.get().await?;
156        client.add_collection(collection).await.map_err(Error::from)
157    }
158
159    async fn add_item(&mut self, item: Item) -> Result<()> {
160        let client = self.pool.get().await?;
161        client.add_item(item).await.map_err(Error::from)
162    }
163
164    async fn add_items(&mut self, items: Vec<Item>) -> Result<()> {
165        tracing::debug!("adding {} items using pgstac loading", items.len());
166        let client = self.pool.get().await?;
167        client.add_items(&items).await.map_err(Error::from)
168    }
169}
170
171impl<Tls> StreamItemsClient for PgstacBackend<Tls>
172where
173    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
174    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
175    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
176    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
177{
178    type Error = Error;
179
180    async fn search_stream(
181        &self,
182        search: Search,
183    ) -> Result<impl Stream<Item = std::result::Result<stac::api::Item, Error>> + Send> {
184        let page = ItemsClient::search(self, search.clone()).await?;
185        Ok(stream_pages(self.clone(), search, page))
186    }
187}
188
189impl<Tls> Backend for PgstacBackend<Tls>
190where
191    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
192    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
193    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
194    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
195{
196    fn has_item_search(&self) -> bool {
197        true
198    }
199
200    fn has_filter(&self) -> bool {
201        true
202    }
203}