Skip to main content

tonin_core/
state.rs

1//! Pre-wired DB + cache connections.
2//!
3//! [`State`] holds the connection handles a service needs. It's
4//! constructed at boot via [`State::from_env`], threaded through to
5//! handlers as `Arc<State>` (or via a builder field), and used by app
6//! code with sqlx / redis directly. No abstraction layer — the
7//! framework owns the *connection*, you own the *queries*.
8//!
9//! ## Activation
10//!
11//! All fields are optional. DB + cache are always-emitted (env-driven);
12//! object storage is scaffold-time opt-in via `tonin service new
13//! --with-storage <kind>`:
14//!
15//! | Field          | Source                                          |
16//! |----------------|-------------------------------------------------|
17//! | `pg`           | `DATABASE_URL`                                  |
18//! | `redis`        | `REDIS_URL`                                     |
19//! | `storage`      | scaffolded `--with-storage` + `STORAGE_*` env   |
20//!
21//! Absent env → field stays `None`, no connection attempt. So a service
22//! that doesn't use a given backend still compiles and runs — calling
23//! `state.pg()` / `state.storage()` returns `Err(Error::Config("…not
24//! set"))` if reached.
25//!
26//! ## Extensibility — `StorageProvider` trait
27//!
28//! Object storage hides behind a [`StorageProvider`] trait so users
29//! can swap the default opendal-based implementation for anything
30//! else (a custom client, an in-memory mock, a different SDK) without
31//! touching the framework. The scaffold's `--with-storage` flag emits
32//! an `OpendalStorage` impl into `state.rs`, but any
33//! `Arc<dyn StorageProvider>` works in its place.
34//!
35//! ## Why concrete types, not traits
36//!
37//! The `Database` / `Cache` traits in [`crate::traits`] exist for
38//! capability discovery (telemetry attributes, swappable impls down
39//! the road). For the actual query path, services should use sqlx /
40//! redis directly — those libraries already produce OTel spans when a
41//! tracer provider is installed, and wrapping them in a thin trait
42//! layer mostly costs ergonomics.
43
44use std::sync::Arc;
45
46use async_trait::async_trait;
47
48use crate::error::{Error, Result};
49
50/// Pluggable object-storage backend.
51///
52/// The default impl emitted by `--with-storage` wraps an
53/// `opendal::Operator`, but anything matching this trait works. The
54/// framework only calls [`StorageProvider::probe`] at boot — every
55/// other call goes through whatever concrete API the impl exposes
56/// (so users get full opendal surface without going through this
57/// trait if they're using the default).
58///
59/// To wire a custom provider: construct it yourself, then
60/// [`State::with_storage`] (no need to set env vars).
61#[async_trait]
62pub trait StorageProvider: Send + Sync + 'static {
63    /// Cheap connectivity check. The scaffold's default impl calls
64    /// `Operator::list("/").limit(1).await` and ignores the contents
65    /// — we just want to know we can talk to the bucket. Errors
66    /// here should cause the service to fail to start.
67    async fn probe(&self) -> Result<()>;
68
69    /// Tag used in span attributes (`storage.system`). Conventions:
70    /// `"s3"`, `"gcs"`, `"azure"`, `"local"`, `"memory"`. The default
71    /// returns `"custom"` so impls don't have to override unless they
72    /// want telemetry to know.
73    fn system(&self) -> &'static str {
74        "custom"
75    }
76}
77
78/// Bundle of optional connection handles. Cheap to clone — all inner
79/// types are reference-counted.
80#[derive(Clone, Default)]
81pub struct State {
82    pg: Option<sqlx::PgPool>,
83    redis: Option<Arc<redis::Client>>,
84    storage: Option<Arc<dyn StorageProvider>>,
85}
86
87impl State {
88    /// Attach a custom [`StorageProvider`]. Probes it immediately and
89    /// returns the error if the probe fails — same contract as the
90    /// other backends ("fail fast at boot").
91    pub async fn with_storage<S: StorageProvider>(mut self, storage: S) -> Result<Self> {
92        storage.probe().await?;
93        self.storage = Some(Arc::new(storage));
94        Ok(self)
95    }
96
97    /// Attach a pre-boxed provider without re-probing. Use when the
98    /// caller has already done their own validation, or when injecting
99    /// a mock from tests.
100    pub fn set_storage(&mut self, storage: Arc<dyn StorageProvider>) {
101        self.storage = Some(storage);
102    }
103
104    /// Whether a storage provider is wired.
105    pub fn has_storage(&self) -> bool {
106        self.storage.is_some()
107    }
108
109    /// Borrow the trait-object storage handle. Returns a `Config` error
110    /// if no provider was wired (scaffold without `--with-storage`, or
111    /// `STORAGE_BUCKET` unset in the generated init).
112    pub fn storage(&self) -> Result<&Arc<dyn StorageProvider>> {
113        self.storage.as_ref().ok_or_else(|| {
114            Error::Config(
115                "storage requested but no provider was wired at startup \
116                 (scaffold with --with-storage to enable)"
117                    .into(),
118            )
119        })
120    }
121
122    /// Build a `State` from environment variables. Tries each backend
123    /// independently; missing env vars produce `None` fields, not
124    /// errors. Connection failures DO error — if `DATABASE_URL` is set
125    /// but unreachable, that's a deploy-time problem and the service
126    /// should fail to start.
127    ///
128    /// Object storage is **not** initialized here — its concrete client
129    /// type (opendal::Operator, AWS SDK, etc.) is scaffold-time opt-in,
130    /// so this framework crate doesn't pull those deps. The scaffolded
131    /// `main.rs` calls [`State::with_storage`] separately when
132    /// `--with-storage` was used.
133    pub async fn from_env() -> Result<Self> {
134        let pg = match std::env::var("DATABASE_URL") {
135            Ok(url) => {
136                tracing::info!(target: "tonin::state", "connecting to postgres");
137                let pool = sqlx::postgres::PgPoolOptions::new()
138                    .max_connections(default_pg_max_conns())
139                    .connect(&url)
140                    .await
141                    .map_err(|e| Error::Config(format!("postgres connect failed: {e}")))?;
142                Some(pool)
143            }
144            Err(_) => None,
145        };
146
147        let redis = match std::env::var("REDIS_URL") {
148            Ok(url) => {
149                tracing::info!(target: "tonin::state", "connecting to redis");
150                let client = redis::Client::open(url)
151                    .map_err(|e| Error::Config(format!("redis client init: {e}")))?;
152                // Eagerly verify reachability so a misconfigured cache fails fast.
153                let mut conn = client
154                    .get_multiplexed_async_connection()
155                    .await
156                    .map_err(|e| Error::Config(format!("redis connect failed: {e}")))?;
157                let _: String = redis::cmd("PING")
158                    .query_async(&mut conn)
159                    .await
160                    .map_err(|e| Error::Config(format!("redis PING failed: {e}")))?;
161                Some(Arc::new(client))
162            }
163            Err(_) => None,
164        };
165
166        Ok(Self {
167            pg,
168            redis,
169            storage: None,
170        })
171    }
172
173    /// Borrow the Postgres pool. Returns a `Config` error if
174    /// `DATABASE_URL` was not set at boot.
175    pub fn pg(&self) -> Result<&sqlx::PgPool> {
176        self.pg.as_ref().ok_or_else(|| {
177            Error::Config("postgres requested but DATABASE_URL was not set at startup".into())
178        })
179    }
180
181    /// Whether the Postgres pool is available.
182    pub fn has_pg(&self) -> bool {
183        self.pg.is_some()
184    }
185
186    /// Borrow the Redis client. Returns a `Config` error if `REDIS_URL`
187    /// was not set at boot.
188    pub fn redis(&self) -> Result<&redis::Client> {
189        self.redis.as_deref().ok_or_else(|| {
190            Error::Config("redis requested but REDIS_URL was not set at startup".into())
191        })
192    }
193
194    /// Whether the Redis client is available.
195    pub fn has_redis(&self) -> bool {
196        self.redis.is_some()
197    }
198}
199
200/// Default max pool size. Overridable via `TONIN_PG_MAX_CONNECTIONS`.
201fn default_pg_max_conns() -> u32 {
202    std::env::var("TONIN_PG_MAX_CONNECTIONS")
203        .ok()
204        .and_then(|s| s.parse().ok())
205        .unwrap_or(10)
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    use std::sync::atomic::{AtomicUsize, Ordering};
212
213    /// In-memory mock implementing [`StorageProvider`]. Verifies the
214    /// trait is implementable without the opendal dep — same shape any
215    /// custom user impl would take.
216    struct MockStorage {
217        probes: AtomicUsize,
218        probe_fails: bool,
219    }
220
221    #[async_trait]
222    impl StorageProvider for MockStorage {
223        async fn probe(&self) -> Result<()> {
224            self.probes.fetch_add(1, Ordering::SeqCst);
225            if self.probe_fails {
226                Err(Error::Config("mock probe failure".into()))
227            } else {
228                Ok(())
229            }
230        }
231        fn system(&self) -> &'static str {
232            "memory"
233        }
234    }
235
236    #[tokio::test]
237    async fn empty_state_when_no_env_vars() {
238        // Guard against the test environment leaking these. If they
239        // happen to be set on the dev box, skip — the assertion below
240        // wouldn't hold but it's not a code bug.
241        if std::env::var("DATABASE_URL").is_ok() || std::env::var("REDIS_URL").is_ok() {
242            return;
243        }
244        let state = State::from_env().await.unwrap();
245        assert!(!state.has_pg());
246        assert!(!state.has_redis());
247        assert!(!state.has_storage());
248        assert!(state.pg().is_err());
249        assert!(state.redis().is_err());
250        assert!(state.storage().is_err());
251    }
252
253    #[tokio::test]
254    async fn with_storage_runs_probe() {
255        let state = State::default();
256        let storage = MockStorage {
257            probes: AtomicUsize::new(0),
258            probe_fails: false,
259        };
260        let state = state.with_storage(storage).await.unwrap();
261        assert!(state.has_storage());
262        assert_eq!(state.storage().unwrap().system(), "memory");
263    }
264
265    #[tokio::test]
266    async fn with_storage_propagates_probe_failure() {
267        let state = State::default();
268        let storage = MockStorage {
269            probes: AtomicUsize::new(0),
270            probe_fails: true,
271        };
272        match state.with_storage(storage).await {
273            Ok(_) => panic!("expected probe failure to propagate"),
274            Err(Error::Config(_)) => {}
275            Err(other) => panic!("expected Config, got {other:?}"),
276        }
277    }
278}