stac_server/backend/
pgstac.rs1use crate::{Backend, Error, Result};
2use bb8::Pool;
3use bb8_postgres::PostgresConnectionManager;
4use pgstac::Pgstac;
5use rustls::{ClientConfig, RootCertStore};
6use serde_json::Map;
7use stac::api::{ItemCollection, Items, Search};
8use stac::{Collection, Item};
9use tokio_postgres::{
10 Socket,
11 tls::{MakeTlsConnect, TlsConnect},
12};
13use tokio_postgres_rustls::MakeRustlsConnect;
14
15#[derive(Clone, Debug)]
17pub struct PgstacBackend<Tls>
18where
19 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
20 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
21 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
22 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
23{
24 pool: Pool<PostgresConnectionManager<Tls>>,
25}
26
27impl PgstacBackend<MakeRustlsConnect> {
28 pub async fn new_from_stringlike(
42 params: impl ToString,
43 ) -> Result<PgstacBackend<MakeRustlsConnect>> {
44 let _ = rustls::crypto::aws_lc_rs::default_provider()
45 .install_default()
46 .expect("The default provider should install without problems");
47 let config = ClientConfig::builder()
48 .with_root_certificates(RootCertStore::empty())
49 .with_no_client_auth();
50 let tls = MakeRustlsConnect::new(config);
51 PgstacBackend::new_from_stringlike_and_tls(params, tls).await
52 }
53}
54
55impl<Tls> PgstacBackend<Tls>
56where
57 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
58 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
59 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
60 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
61{
62 pub async fn new_from_stringlike_and_tls(
64 params: impl ToString,
65 tls: Tls,
66 ) -> Result<PgstacBackend<Tls>> {
67 let params = params.to_string();
68 let connection_manager = PostgresConnectionManager::new_from_stringlike(params, tls)?;
69 let pool = Pool::builder().build(connection_manager).await?;
70 Ok(PgstacBackend { pool })
71 }
72}
73
74impl<Tls> Backend for PgstacBackend<Tls>
75where
76 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
77 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
78 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
79 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
80{
81 fn has_item_search(&self) -> bool {
82 true
83 }
84
85 fn has_filter(&self) -> bool {
86 true
87 }
88
89 async fn add_collection(&mut self, collection: Collection) -> Result<()> {
90 let client = self.pool.get().await?;
91 client.add_collection(collection).await.map_err(Error::from)
92 }
93
94 async fn collection(&self, id: &str) -> Result<Option<Collection>> {
95 let client = self.pool.get().await?;
96 let value = client.collection(id).await?;
97 value
98 .map(serde_json::from_value)
99 .transpose()
100 .map_err(Error::from)
101 }
102
103 async fn collections(&self) -> Result<Vec<Collection>> {
104 let client = self.pool.get().await?;
105 let values = client.collections().await?;
106 values
107 .into_iter()
108 .map(|v| serde_json::from_value(v).map_err(Error::from))
109 .collect()
110 }
111
112 async fn add_item(&mut self, item: Item) -> Result<()> {
113 let client = self.pool.get().await?;
114 client.add_item(item).await.map_err(Error::from)
115 }
116
117 async fn add_items(&mut self, items: Vec<Item>) -> Result<()> {
118 tracing::debug!("adding {} items using pgstac loading", items.len());
119 let client = self.pool.get().await?;
120 client.add_items(&items).await.map_err(Error::from)
121 }
122
123 async fn items(&self, collection_id: &str, items: Items) -> Result<Option<ItemCollection>> {
124 let search = items.search_collection(collection_id);
126 self.search(search).await.map(Some)
127 }
128
129 async fn item(&self, collection_id: &str, item_id: &str) -> Result<Option<Item>> {
130 let client = self.pool.get().await?;
131 let value = client.item(item_id, Some(collection_id)).await?;
132 value
133 .map(serde_json::from_value)
134 .transpose()
135 .map_err(Error::from)
136 }
137
138 async fn search(&self, search: Search) -> Result<ItemCollection> {
139 let client = self.pool.get().await?;
140 let page = client.search(search).await?;
141 let next_token = page.next_token();
142 let prev_token = page.prev_token();
143 let mut item_collection = ItemCollection::new(page.features)?;
144 if let Some(next_token) = next_token {
145 let mut next = Map::new();
146 let _ = next.insert("token".into(), next_token.into());
147 item_collection.next = Some(next);
148 }
149 if let Some(prev_token) = prev_token {
150 let mut prev = Map::new();
151 let _ = prev.insert("token".into(), prev_token.into());
152 item_collection.prev = Some(prev);
153 }
154 item_collection.context = page.context;
155 Ok(item_collection)
156 }
157}