tonin_core/state.rs
1//! Pre-wired DB + cache connections.
2//!
3//! [`State`] holds the connection handles a service needs. It's
4//! constructed at boot via [`State::from_env`], threaded through to
5//! handlers as `Arc<State>` (or via a builder field), and used by app
6//! code with sqlx / redis directly. No abstraction layer — the
7//! framework owns the *connection*, you own the *queries*.
8//!
9//! ## Activation
10//!
11//! All fields are optional. DB + cache are always-emitted (env-driven);
12//! object storage is scaffold-time opt-in via `tonin service new
13//! --with-storage <kind>`:
14//!
15//! | Field | Source |
16//! |----------------|-------------------------------------------------|
17//! | `pg` | `DATABASE_URL` |
18//! | `redis` | `REDIS_URL` |
19//! | `storage` | scaffolded `--with-storage` + `STORAGE_*` env |
20//!
21//! Absent env → field stays `None`, no connection attempt. So a service
22//! that doesn't use a given backend still compiles and runs — calling
23//! `state.pg()` / `state.storage()` returns `Err(Error::Config("…not
24//! set"))` if reached.
25//!
26//! ## Extensibility — `StorageProvider` trait
27//!
28//! Object storage hides behind a [`StorageProvider`] trait so users
29//! can swap the default opendal-based implementation for anything
30//! else (a custom client, an in-memory mock, a different SDK) without
31//! touching the framework. The scaffold's `--with-storage` flag emits
32//! an `OpendalStorage` impl into `state.rs`, but any
33//! `Arc<dyn StorageProvider>` works in its place.
34//!
35//! ## Why concrete types, not traits
36//!
37//! The `Database` / `Cache` traits in [`crate::traits`] exist for
38//! capability discovery (telemetry attributes, swappable impls down
39//! the road). For the actual query path, services should use sqlx /
40//! redis directly — those libraries already produce OTel spans when a
41//! tracer provider is installed, and wrapping them in a thin trait
42//! layer mostly costs ergonomics.
43
44use std::sync::Arc;
45
46use async_trait::async_trait;
47
48use crate::error::{Error, Result};
49
50/// Pluggable object-storage backend.
51///
52/// The default impl emitted by `--with-storage` wraps an
53/// `opendal::Operator`, but anything matching this trait works. The
54/// framework only calls [`StorageProvider::probe`] at boot — every
55/// other call goes through whatever concrete API the impl exposes
56/// (so users get full opendal surface without going through this
57/// trait if they're using the default).
58///
59/// To wire a custom provider: construct it yourself, then
60/// [`State::with_storage`] (no need to set env vars).
61#[async_trait]
62pub trait StorageProvider: Send + Sync + 'static {
63 /// Cheap connectivity check. The scaffold's default impl calls
64 /// `Operator::list("/").limit(1).await` and ignores the contents
65 /// — we just want to know we can talk to the bucket. Errors
66 /// here should cause the service to fail to start.
67 async fn probe(&self) -> Result<()>;
68
69 /// Tag used in span attributes (`storage.system`). Conventions:
70 /// `"s3"`, `"gcs"`, `"azure"`, `"local"`, `"memory"`. The default
71 /// returns `"custom"` so impls don't have to override unless they
72 /// want telemetry to know.
73 fn system(&self) -> &'static str {
74 "custom"
75 }
76}
77
78/// Bundle of optional connection handles. Cheap to clone — all inner
79/// types are reference-counted.
80#[derive(Clone, Default)]
81pub struct State {
82 pg: Option<sqlx::PgPool>,
83 redis: Option<Arc<redis::Client>>,
84 storage: Option<Arc<dyn StorageProvider>>,
85}
86
87impl State {
88 /// Attach a custom [`StorageProvider`]. Probes it immediately and
89 /// returns the error if the probe fails — same contract as the
90 /// other backends ("fail fast at boot").
91 pub async fn with_storage<S: StorageProvider>(mut self, storage: S) -> Result<Self> {
92 storage.probe().await?;
93 self.storage = Some(Arc::new(storage));
94 Ok(self)
95 }
96
97 /// Attach a pre-boxed provider without re-probing. Use when the
98 /// caller has already done their own validation, or when injecting
99 /// a mock from tests.
100 pub fn set_storage(&mut self, storage: Arc<dyn StorageProvider>) {
101 self.storage = Some(storage);
102 }
103
104 /// Whether a storage provider is wired.
105 pub fn has_storage(&self) -> bool {
106 self.storage.is_some()
107 }
108
109 /// Borrow the trait-object storage handle. Returns a `Config` error
110 /// if no provider was wired (scaffold without `--with-storage`, or
111 /// `STORAGE_BUCKET` unset in the generated init).
112 pub fn storage(&self) -> Result<&Arc<dyn StorageProvider>> {
113 self.storage.as_ref().ok_or_else(|| {
114 Error::Config(
115 "storage requested but no provider was wired at startup \
116 (scaffold with --with-storage to enable)"
117 .into(),
118 )
119 })
120 }
121
122 /// Build a `State` from environment variables. Tries each backend
123 /// independently; missing env vars produce `None` fields, not
124 /// errors. Connection failures DO error — if `DATABASE_URL` is set
125 /// but unreachable, that's a deploy-time problem and the service
126 /// should fail to start.
127 ///
128 /// Object storage is **not** initialized here — its concrete client
129 /// type (opendal::Operator, AWS SDK, etc.) is scaffold-time opt-in,
130 /// so this framework crate doesn't pull those deps. The scaffolded
131 /// `main.rs` calls [`State::with_storage`] separately when
132 /// `--with-storage` was used.
133 pub async fn from_env() -> Result<Self> {
134 let pg = match std::env::var("DATABASE_URL") {
135 Ok(url) => {
136 tracing::info!(target: "tonin::state", "connecting to postgres");
137 let pool = sqlx::postgres::PgPoolOptions::new()
138 .max_connections(default_pg_max_conns())
139 .connect(&url)
140 .await
141 .map_err(|e| Error::Config(format!("postgres connect failed: {e}")))?;
142 Some(pool)
143 }
144 Err(_) => None,
145 };
146
147 let redis = match std::env::var("REDIS_URL") {
148 Ok(url) => {
149 tracing::info!(target: "tonin::state", "connecting to redis");
150 let client = redis::Client::open(url)
151 .map_err(|e| Error::Config(format!("redis client init: {e}")))?;
152 // Eagerly verify reachability so a misconfigured cache fails fast.
153 let mut conn = client
154 .get_multiplexed_async_connection()
155 .await
156 .map_err(|e| Error::Config(format!("redis connect failed: {e}")))?;
157 let _: String = redis::cmd("PING")
158 .query_async(&mut conn)
159 .await
160 .map_err(|e| Error::Config(format!("redis PING failed: {e}")))?;
161 Some(Arc::new(client))
162 }
163 Err(_) => None,
164 };
165
166 Ok(Self {
167 pg,
168 redis,
169 storage: None,
170 })
171 }
172
173 /// Borrow the Postgres pool. Returns a `Config` error if
174 /// `DATABASE_URL` was not set at boot.
175 pub fn pg(&self) -> Result<&sqlx::PgPool> {
176 self.pg.as_ref().ok_or_else(|| {
177 Error::Config("postgres requested but DATABASE_URL was not set at startup".into())
178 })
179 }
180
181 /// Whether the Postgres pool is available.
182 pub fn has_pg(&self) -> bool {
183 self.pg.is_some()
184 }
185
186 /// Borrow the Redis client. Returns a `Config` error if `REDIS_URL`
187 /// was not set at boot.
188 pub fn redis(&self) -> Result<&redis::Client> {
189 self.redis.as_deref().ok_or_else(|| {
190 Error::Config("redis requested but REDIS_URL was not set at startup".into())
191 })
192 }
193
194 /// Whether the Redis client is available.
195 pub fn has_redis(&self) -> bool {
196 self.redis.is_some()
197 }
198}
199
200/// Default max pool size. Overridable via `TONIN_PG_MAX_CONNECTIONS`.
201fn default_pg_max_conns() -> u32 {
202 std::env::var("TONIN_PG_MAX_CONNECTIONS")
203 .ok()
204 .and_then(|s| s.parse().ok())
205 .unwrap_or(10)
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211 use std::sync::atomic::{AtomicUsize, Ordering};
212
213 /// In-memory mock implementing [`StorageProvider`]. Verifies the
214 /// trait is implementable without the opendal dep — same shape any
215 /// custom user impl would take.
216 struct MockStorage {
217 probes: AtomicUsize,
218 probe_fails: bool,
219 }
220
221 #[async_trait]
222 impl StorageProvider for MockStorage {
223 async fn probe(&self) -> Result<()> {
224 self.probes.fetch_add(1, Ordering::SeqCst);
225 if self.probe_fails {
226 Err(Error::Config("mock probe failure".into()))
227 } else {
228 Ok(())
229 }
230 }
231 fn system(&self) -> &'static str {
232 "memory"
233 }
234 }
235
236 #[tokio::test]
237 async fn empty_state_when_no_env_vars() {
238 // Guard against the test environment leaking these. If they
239 // happen to be set on the dev box, skip — the assertion below
240 // wouldn't hold but it's not a code bug.
241 if std::env::var("DATABASE_URL").is_ok() || std::env::var("REDIS_URL").is_ok() {
242 return;
243 }
244 let state = State::from_env().await.unwrap();
245 assert!(!state.has_pg());
246 assert!(!state.has_redis());
247 assert!(!state.has_storage());
248 assert!(state.pg().is_err());
249 assert!(state.redis().is_err());
250 assert!(state.storage().is_err());
251 }
252
253 #[tokio::test]
254 async fn with_storage_runs_probe() {
255 let state = State::default();
256 let storage = MockStorage {
257 probes: AtomicUsize::new(0),
258 probe_fails: false,
259 };
260 let state = state.with_storage(storage).await.unwrap();
261 assert!(state.has_storage());
262 assert_eq!(state.storage().unwrap().system(), "memory");
263 }
264
265 #[tokio::test]
266 async fn with_storage_propagates_probe_failure() {
267 let state = State::default();
268 let storage = MockStorage {
269 probes: AtomicUsize::new(0),
270 probe_fails: true,
271 };
272 match state.with_storage(storage).await {
273 Ok(_) => panic!("expected probe failure to propagate"),
274 Err(Error::Config(_)) => {}
275 Err(other) => panic!("expected Config, got {other:?}"),
276 }
277 }
278}