thread_flow/incremental/backends/mod.rs
1// SPDX-FileCopyrightText: 2025 Knitli Inc. <knitli@knit.li>
2// SPDX-License-Identifier: AGPL-3.0-or-later
3
4//! Concrete storage backend implementations for the incremental update system.
5//!
6//! This module provides database-specific implementations of the
7//! [`StorageBackend`](super::storage::StorageBackend) trait:
8//!
9//! - **Postgres** (`postgres-backend` feature): Full SQL backend for CLI deployment
10//! with connection pooling, prepared statements, and batch operations.
11//! - **D1** (`d1-backend` feature): Cloudflare D1 backend for edge deployment
12//! via the Cloudflare REST API.
13//! - **InMemory**: Simple in-memory backend for testing (always available).
14//!
15//! ## Backend Factory Pattern
16//!
17//! The [`create_backend`] factory function provides runtime backend selection
18//! based on deployment environment and feature flags:
19//!
20//! ```rust
21//! use thread_flow::incremental::backends::{BackendType, BackendConfig, create_backend};
22//!
23//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
24//! // CLI deployment with Postgres
25//! # #[cfg(feature = "postgres-backend")]
26//! let backend = create_backend(
27//! BackendType::Postgres,
28//! BackendConfig::Postgres {
29//! database_url: "postgresql://localhost/thread".to_string(),
30//! },
31//! ).await?;
32//!
33//! // Edge deployment with D1
34//! # #[cfg(feature = "d1-backend")]
35//! let backend = create_backend(
36//! BackendType::D1,
37//! BackendConfig::D1 {
38//! account_id: "your-account-id".to_string(),
39//! database_id: "your-db-id".to_string(),
40//! api_token: "your-token".to_string(),
41//! },
42//! ).await?;
43//!
44//! // Testing with in-memory storage (always available)
45//! let backend = create_backend(
46//! BackendType::InMemory,
47//! BackendConfig::InMemory,
48//! ).await?;
49//! # Ok(())
50//! # }
51//! ```
52//!
53//! ## Feature Gating
54//!
55//! Backend availability depends on cargo features:
56//!
57//! - `postgres-backend`: Enables [`PostgresIncrementalBackend`]
58//! - `d1-backend`: Enables [`D1IncrementalBackend`]
59//! - No features required: [`InMemoryStorage`] always available
60//!
61//! Attempting to use a disabled backend returns [`IncrementalError::UnsupportedBackend`].
62//!
63//! ## Deployment Scenarios
64//!
65//! ### CLI Deployment (Postgres)
66//!
67//! ```toml
68//! [dependencies]
69//! thread-flow = { version = "*", features = ["postgres-backend"] }
70//! ```
71//!
72//! ```rust
73//! # #[cfg(feature = "postgres-backend")]
74//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
75//! use thread_flow::incremental::backends::{BackendType, BackendConfig, create_backend};
76//!
77//! let backend = create_backend(
78//! BackendType::Postgres,
79//! BackendConfig::Postgres {
80//! database_url: std::env::var("DATABASE_URL")?,
81//! },
82//! ).await?;
83//! # Ok(())
84//! # }
85//! ```
86//!
87//! ### Edge Deployment (D1)
88//!
89//! ```toml
90//! [dependencies]
91//! thread-flow = { version = "*", features = ["d1-backend", "worker"] }
92//! ```
93//!
94//! ```rust
95//! # #[cfg(feature = "d1-backend")]
96//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
97//! use thread_flow::incremental::backends::{BackendType, BackendConfig, create_backend};
98//!
99//! let backend = create_backend(
100//! BackendType::D1,
101//! BackendConfig::D1 {
102//! account_id: std::env::var("CF_ACCOUNT_ID")?,
103//! database_id: std::env::var("CF_DATABASE_ID")?,
104//! api_token: std::env::var("CF_API_TOKEN")?,
105//! },
106//! ).await?;
107//! # Ok(())
108//! # }
109//! ```
110//!
111//! ### Testing (InMemory)
112//!
113//! ```rust
114//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
115//! use thread_flow::incremental::backends::{BackendType, BackendConfig, create_backend};
116//!
117//! let backend = create_backend(
118//! BackendType::InMemory,
119//! BackendConfig::InMemory,
120//! ).await?;
121//! # Ok(())
122//! # }
123//! ```
124
125use super::storage::{InMemoryStorage, StorageBackend};
126use std::error::Error;
127use std::fmt;
128
129#[cfg(feature = "postgres-backend")]
130pub mod postgres;
131
132#[cfg(feature = "d1-backend")]
133pub mod d1;
134
135#[cfg(feature = "postgres-backend")]
136pub use postgres::PostgresIncrementalBackend;
137
138#[cfg(feature = "d1-backend")]
139pub use d1::D1IncrementalBackend;
140
141// ─── Error Types ──────────────────────────────────────────────────────────────
142
143/// Errors that can occur during backend initialization and operation.
144#[derive(Debug)]
145pub enum IncrementalError {
146 /// The requested backend is not available (feature flag disabled).
147 UnsupportedBackend(&'static str),
148
149 /// Backend initialization failed (connection error, invalid config, etc.).
150 InitializationFailed(String),
151
152 /// Propagated storage error from backend operations.
153 Storage(super::storage::StorageError),
154}
155
156impl fmt::Display for IncrementalError {
157 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
158 match self {
159 IncrementalError::UnsupportedBackend(backend) => {
160 write!(
161 f,
162 "Backend '{}' is not available. Enable the corresponding feature flag.",
163 backend
164 )
165 }
166 IncrementalError::InitializationFailed(msg) => {
167 write!(f, "Backend initialization failed: {}", msg)
168 }
169 IncrementalError::Storage(err) => write!(f, "Storage error: {}", err),
170 }
171 }
172}
173
174impl Error for IncrementalError {
175 fn source(&self) -> Option<&(dyn Error + 'static)> {
176 match self {
177 IncrementalError::Storage(err) => Some(err),
178 _ => None,
179 }
180 }
181}
182
183impl From<super::storage::StorageError> for IncrementalError {
184 fn from(err: super::storage::StorageError) -> Self {
185 IncrementalError::Storage(err)
186 }
187}
188
189// ─── Backend Configuration ────────────────────────────────────────────────────
190
191/// Backend type selector for runtime backend selection.
192///
193/// Use this enum with [`create_backend`] to instantiate the appropriate
194/// storage backend based on deployment environment.
195#[derive(Debug, Clone, Copy, PartialEq, Eq)]
196pub enum BackendType {
197 /// PostgreSQL backend (requires `postgres-backend` feature).
198 ///
199 /// Primary backend for CLI deployment with connection pooling
200 /// and batch operations.
201 Postgres,
202
203 /// Cloudflare D1 backend (requires `d1-backend` feature).
204 ///
205 /// Primary backend for edge deployment via Cloudflare Workers.
206 D1,
207
208 /// In-memory backend (always available).
209 ///
210 /// Used for testing and development. Data is not persisted.
211 InMemory,
212}
213
214/// Configuration for backend initialization.
215///
216/// Each variant contains the connection parameters needed to initialize
217/// the corresponding backend type.
218#[derive(Debug, Clone)]
219pub enum BackendConfig {
220 /// PostgreSQL connection configuration.
221 Postgres {
222 /// PostgreSQL connection URL (e.g., `postgresql://localhost/thread`).
223 database_url: String,
224 },
225
226 /// Cloudflare D1 connection configuration.
227 D1 {
228 /// Cloudflare account ID.
229 account_id: String,
230 /// D1 database ID.
231 database_id: String,
232 /// Cloudflare API token with D1 read/write permissions.
233 api_token: String,
234 },
235
236 /// In-memory storage (no configuration needed).
237 InMemory,
238}
239
240// ─── Backend Factory ──────────────────────────────────────────────────────────
241
242/// Creates a storage backend based on the specified type and configuration.
243///
244/// This factory function provides runtime backend selection with compile-time
245/// feature gating. If a backend is requested but its feature flag is disabled,
246/// returns [`IncrementalError::UnsupportedBackend`].
247///
248/// # Arguments
249///
250/// * `backend_type` - The type of backend to instantiate.
251/// * `config` - Configuration parameters for the backend.
252///
253/// # Returns
254///
255/// A boxed trait object implementing [`StorageBackend`], or an error if:
256/// - The backend feature is disabled ([`IncrementalError::UnsupportedBackend`])
257/// - Backend initialization fails ([`IncrementalError::InitializationFailed`])
258/// - Configuration mismatch between `backend_type` and `config`
259///
260/// # Examples
261///
262/// ```rust
263/// use thread_flow::incremental::backends::{BackendType, BackendConfig, create_backend};
264///
265/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
266/// // Create in-memory backend (always available)
267/// let backend = create_backend(
268/// BackendType::InMemory,
269/// BackendConfig::InMemory,
270/// ).await?;
271///
272/// // Create Postgres backend (requires postgres-backend feature)
273/// # #[cfg(feature = "postgres-backend")]
274/// let backend = create_backend(
275/// BackendType::Postgres,
276/// BackendConfig::Postgres {
277/// database_url: "postgresql://localhost/thread".to_string(),
278/// },
279/// ).await?;
280/// # Ok(())
281/// # }
282/// ```
283///
284/// # Errors
285///
286/// - [`IncrementalError::UnsupportedBackend`]: Feature flag disabled for requested backend
287/// - [`IncrementalError::InitializationFailed`]: Connection failed, invalid config, or initialization error
288pub async fn create_backend(
289 backend_type: BackendType,
290 config: BackendConfig,
291) -> Result<Box<dyn StorageBackend>, IncrementalError> {
292 match (backend_type, config) {
293 // ── Postgres Backend ──────────────────────────────────────────────
294 (BackendType::Postgres, BackendConfig::Postgres { database_url }) => {
295 #[cfg(feature = "postgres-backend")]
296 {
297 PostgresIncrementalBackend::new(&database_url)
298 .await
299 .map(|b| Box::new(b) as Box<dyn StorageBackend>)
300 .map_err(|e| {
301 IncrementalError::InitializationFailed(format!(
302 "Postgres init failed: {}",
303 e
304 ))
305 })
306 }
307 #[cfg(not(feature = "postgres-backend"))]
308 {
309 let _ = database_url; // Suppress unused warning
310 Err(IncrementalError::UnsupportedBackend("postgres"))
311 }
312 }
313
314 // ── D1 Backend ────────────────────────────────────────────────────
315 (
316 BackendType::D1,
317 BackendConfig::D1 {
318 account_id,
319 database_id,
320 api_token,
321 },
322 ) => {
323 #[cfg(feature = "d1-backend")]
324 {
325 D1IncrementalBackend::new(account_id, database_id, api_token)
326 .map(|b| Box::new(b) as Box<dyn StorageBackend>)
327 .map_err(|e| {
328 IncrementalError::InitializationFailed(format!("D1 init failed: {}", e))
329 })
330 }
331 #[cfg(not(feature = "d1-backend"))]
332 {
333 let _ = (account_id, database_id, api_token); // Suppress unused warnings
334 Err(IncrementalError::UnsupportedBackend("d1"))
335 }
336 }
337
338 // ── InMemory Backend ──────────────────────────────────────────────
339 (BackendType::InMemory, BackendConfig::InMemory) => {
340 Ok(Box::new(InMemoryStorage::new()) as Box<dyn StorageBackend>)
341 }
342
343 // ── Configuration Mismatch ────────────────────────────────────────
344 _ => Err(IncrementalError::InitializationFailed(
345 "Backend type and configuration mismatch".to_string(),
346 )),
347 }
348}
349
350// ─── Tests ────────────────────────────────────────────────────────────────────
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355
356 #[tokio::test]
357 async fn test_create_in_memory_backend() {
358 let result = create_backend(BackendType::InMemory, BackendConfig::InMemory).await;
359 assert!(result.is_ok());
360 }
361
362 #[tokio::test]
363 async fn test_configuration_mismatch() {
364 let result = create_backend(
365 BackendType::InMemory,
366 BackendConfig::Postgres {
367 database_url: "test".to_string(),
368 },
369 )
370 .await;
371 assert!(result.is_err());
372 if let Err(err) = result {
373 assert!(matches!(err, IncrementalError::InitializationFailed(_)));
374 }
375 }
376
377 #[cfg(not(feature = "postgres-backend"))]
378 #[tokio::test]
379 async fn test_postgres_backend_unavailable() {
380 let result = create_backend(
381 BackendType::Postgres,
382 BackendConfig::Postgres {
383 database_url: "postgresql://localhost/test".to_string(),
384 },
385 )
386 .await;
387 assert!(result.is_err());
388 if let Err(err) = result {
389 assert!(matches!(
390 err,
391 IncrementalError::UnsupportedBackend("postgres")
392 ));
393 }
394 }
395
396 #[cfg(not(feature = "d1-backend"))]
397 #[tokio::test]
398 async fn test_d1_backend_unavailable() {
399 let result = create_backend(
400 BackendType::D1,
401 BackendConfig::D1 {
402 account_id: "test".to_string(),
403 database_id: "test".to_string(),
404 api_token: "test".to_string(),
405 },
406 )
407 .await;
408 assert!(result.is_err());
409 if let Err(err) = result {
410 assert!(matches!(err, IncrementalError::UnsupportedBackend("d1")));
411 }
412 }
413
414 #[test]
415 fn test_incremental_error_display() {
416 let err = IncrementalError::UnsupportedBackend("test");
417 assert!(format!("{}", err).contains("not available"));
418
419 let err = IncrementalError::InitializationFailed("connection failed".to_string());
420 assert!(format!("{}", err).contains("connection failed"));
421 }
422
423 #[test]
424 fn test_backend_type_equality() {
425 assert_eq!(BackendType::InMemory, BackendType::InMemory);
426 assert_ne!(BackendType::Postgres, BackendType::D1);
427 }
428}