Skip to main content

thread_flow/incremental/backends/
mod.rs

1// SPDX-FileCopyrightText: 2025 Knitli Inc. <knitli@knit.li>
2// SPDX-License-Identifier: AGPL-3.0-or-later
3
4//! Concrete storage backend implementations for the incremental update system.
5//!
6//! This module provides database-specific implementations of the
7//! [`StorageBackend`](super::storage::StorageBackend) trait:
8//!
9//! - **Postgres** (`postgres-backend` feature): Full SQL backend for CLI deployment
10//!   with connection pooling, prepared statements, and batch operations.
11//! - **D1** (`d1-backend` feature): Cloudflare D1 backend for edge deployment
12//!   via the Cloudflare REST API.
13//! - **InMemory**: Simple in-memory backend for testing (always available).
14//!
15//! ## Backend Factory Pattern
16//!
17//! The [`create_backend`] factory function provides runtime backend selection
18//! based on deployment environment and feature flags:
19//!
20//! ```rust
21//! use thread_flow::incremental::backends::{BackendType, BackendConfig, create_backend};
22//!
23//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
24//! // CLI deployment with Postgres
25//! # #[cfg(feature = "postgres-backend")]
26//! let backend = create_backend(
27//!     BackendType::Postgres,
28//!     BackendConfig::Postgres {
29//!         database_url: "postgresql://localhost/thread".to_string(),
30//!     },
31//! ).await?;
32//!
33//! // Edge deployment with D1
34//! # #[cfg(feature = "d1-backend")]
35//! let backend = create_backend(
36//!     BackendType::D1,
37//!     BackendConfig::D1 {
38//!         account_id: "your-account-id".to_string(),
39//!         database_id: "your-db-id".to_string(),
40//!         api_token: "your-token".to_string(),
41//!     },
42//! ).await?;
43//!
44//! // Testing with in-memory storage (always available)
45//! let backend = create_backend(
46//!     BackendType::InMemory,
47//!     BackendConfig::InMemory,
48//! ).await?;
49//! # Ok(())
50//! # }
51//! ```
52//!
53//! ## Feature Gating
54//!
55//! Backend availability depends on cargo features:
56//!
57//! - `postgres-backend`: Enables [`PostgresIncrementalBackend`]
58//! - `d1-backend`: Enables [`D1IncrementalBackend`]
59//! - No features required: [`InMemoryStorage`] always available
60//!
61//! Attempting to use a disabled backend returns [`IncrementalError::UnsupportedBackend`].
62//!
63//! ## Deployment Scenarios
64//!
65//! ### CLI Deployment (Postgres)
66//!
67//! ```toml
68//! [dependencies]
69//! thread-flow = { version = "*", features = ["postgres-backend"] }
70//! ```
71//!
72//! ```rust
73//! # #[cfg(feature = "postgres-backend")]
74//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
75//! use thread_flow::incremental::backends::{BackendType, BackendConfig, create_backend};
76//!
77//! let backend = create_backend(
78//!     BackendType::Postgres,
79//!     BackendConfig::Postgres {
80//!         database_url: std::env::var("DATABASE_URL")?,
81//!     },
82//! ).await?;
83//! # Ok(())
84//! # }
85//! ```
86//!
87//! ### Edge Deployment (D1)
88//!
89//! ```toml
90//! [dependencies]
91//! thread-flow = { version = "*", features = ["d1-backend", "worker"] }
92//! ```
93//!
94//! ```rust
95//! # #[cfg(feature = "d1-backend")]
96//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
97//! use thread_flow::incremental::backends::{BackendType, BackendConfig, create_backend};
98//!
99//! let backend = create_backend(
100//!     BackendType::D1,
101//!     BackendConfig::D1 {
102//!         account_id: std::env::var("CF_ACCOUNT_ID")?,
103//!         database_id: std::env::var("CF_DATABASE_ID")?,
104//!         api_token: std::env::var("CF_API_TOKEN")?,
105//!     },
106//! ).await?;
107//! # Ok(())
108//! # }
109//! ```
110//!
111//! ### Testing (InMemory)
112//!
113//! ```rust
114//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
115//! use thread_flow::incremental::backends::{BackendType, BackendConfig, create_backend};
116//!
117//! let backend = create_backend(
118//!     BackendType::InMemory,
119//!     BackendConfig::InMemory,
120//! ).await?;
121//! # Ok(())
122//! # }
123//! ```
124
125use super::storage::{InMemoryStorage, StorageBackend};
126use std::error::Error;
127use std::fmt;
128
129#[cfg(feature = "postgres-backend")]
130pub mod postgres;
131
132#[cfg(feature = "d1-backend")]
133pub mod d1;
134
135#[cfg(feature = "postgres-backend")]
136pub use postgres::PostgresIncrementalBackend;
137
138#[cfg(feature = "d1-backend")]
139pub use d1::D1IncrementalBackend;
140
141// ─── Error Types ──────────────────────────────────────────────────────────────
142
143/// Errors that can occur during backend initialization and operation.
144#[derive(Debug)]
145pub enum IncrementalError {
146    /// The requested backend is not available (feature flag disabled).
147    UnsupportedBackend(&'static str),
148
149    /// Backend initialization failed (connection error, invalid config, etc.).
150    InitializationFailed(String),
151
152    /// Propagated storage error from backend operations.
153    Storage(super::storage::StorageError),
154}
155
156impl fmt::Display for IncrementalError {
157    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
158        match self {
159            IncrementalError::UnsupportedBackend(backend) => {
160                write!(
161                    f,
162                    "Backend '{}' is not available. Enable the corresponding feature flag.",
163                    backend
164                )
165            }
166            IncrementalError::InitializationFailed(msg) => {
167                write!(f, "Backend initialization failed: {}", msg)
168            }
169            IncrementalError::Storage(err) => write!(f, "Storage error: {}", err),
170        }
171    }
172}
173
174impl Error for IncrementalError {
175    fn source(&self) -> Option<&(dyn Error + 'static)> {
176        match self {
177            IncrementalError::Storage(err) => Some(err),
178            _ => None,
179        }
180    }
181}
182
183impl From<super::storage::StorageError> for IncrementalError {
184    fn from(err: super::storage::StorageError) -> Self {
185        IncrementalError::Storage(err)
186    }
187}
188
189// ─── Backend Configuration ────────────────────────────────────────────────────
190
191/// Backend type selector for runtime backend selection.
192///
193/// Use this enum with [`create_backend`] to instantiate the appropriate
194/// storage backend based on deployment environment.
195#[derive(Debug, Clone, Copy, PartialEq, Eq)]
196pub enum BackendType {
197    /// PostgreSQL backend (requires `postgres-backend` feature).
198    ///
199    /// Primary backend for CLI deployment with connection pooling
200    /// and batch operations.
201    Postgres,
202
203    /// Cloudflare D1 backend (requires `d1-backend` feature).
204    ///
205    /// Primary backend for edge deployment via Cloudflare Workers.
206    D1,
207
208    /// In-memory backend (always available).
209    ///
210    /// Used for testing and development. Data is not persisted.
211    InMemory,
212}
213
214/// Configuration for backend initialization.
215///
216/// Each variant contains the connection parameters needed to initialize
217/// the corresponding backend type.
218#[derive(Debug, Clone)]
219pub enum BackendConfig {
220    /// PostgreSQL connection configuration.
221    Postgres {
222        /// PostgreSQL connection URL (e.g., `postgresql://localhost/thread`).
223        database_url: String,
224    },
225
226    /// Cloudflare D1 connection configuration.
227    D1 {
228        /// Cloudflare account ID.
229        account_id: String,
230        /// D1 database ID.
231        database_id: String,
232        /// Cloudflare API token with D1 read/write permissions.
233        api_token: String,
234    },
235
236    /// In-memory storage (no configuration needed).
237    InMemory,
238}
239
240// ─── Backend Factory ──────────────────────────────────────────────────────────
241
242/// Creates a storage backend based on the specified type and configuration.
243///
244/// This factory function provides runtime backend selection with compile-time
245/// feature gating. If a backend is requested but its feature flag is disabled,
246/// returns [`IncrementalError::UnsupportedBackend`].
247///
248/// # Arguments
249///
250/// * `backend_type` - The type of backend to instantiate.
251/// * `config` - Configuration parameters for the backend.
252///
253/// # Returns
254///
255/// A boxed trait object implementing [`StorageBackend`], or an error if:
256/// - The backend feature is disabled ([`IncrementalError::UnsupportedBackend`])
257/// - Backend initialization fails ([`IncrementalError::InitializationFailed`])
258/// - Configuration mismatch between `backend_type` and `config`
259///
260/// # Examples
261///
262/// ```rust
263/// use thread_flow::incremental::backends::{BackendType, BackendConfig, create_backend};
264///
265/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
266/// // Create in-memory backend (always available)
267/// let backend = create_backend(
268///     BackendType::InMemory,
269///     BackendConfig::InMemory,
270/// ).await?;
271///
272/// // Create Postgres backend (requires postgres-backend feature)
273/// # #[cfg(feature = "postgres-backend")]
274/// let backend = create_backend(
275///     BackendType::Postgres,
276///     BackendConfig::Postgres {
277///         database_url: "postgresql://localhost/thread".to_string(),
278///     },
279/// ).await?;
280/// # Ok(())
281/// # }
282/// ```
283///
284/// # Errors
285///
286/// - [`IncrementalError::UnsupportedBackend`]: Feature flag disabled for requested backend
287/// - [`IncrementalError::InitializationFailed`]: Connection failed, invalid config, or initialization error
288pub async fn create_backend(
289    backend_type: BackendType,
290    config: BackendConfig,
291) -> Result<Box<dyn StorageBackend>, IncrementalError> {
292    match (backend_type, config) {
293        // ── Postgres Backend ──────────────────────────────────────────────
294        (BackendType::Postgres, BackendConfig::Postgres { database_url }) => {
295            #[cfg(feature = "postgres-backend")]
296            {
297                PostgresIncrementalBackend::new(&database_url)
298                    .await
299                    .map(|b| Box::new(b) as Box<dyn StorageBackend>)
300                    .map_err(|e| {
301                        IncrementalError::InitializationFailed(format!(
302                            "Postgres init failed: {}",
303                            e
304                        ))
305                    })
306            }
307            #[cfg(not(feature = "postgres-backend"))]
308            {
309                let _ = database_url; // Suppress unused warning
310                Err(IncrementalError::UnsupportedBackend("postgres"))
311            }
312        }
313
314        // ── D1 Backend ────────────────────────────────────────────────────
315        (
316            BackendType::D1,
317            BackendConfig::D1 {
318                account_id,
319                database_id,
320                api_token,
321            },
322        ) => {
323            #[cfg(feature = "d1-backend")]
324            {
325                D1IncrementalBackend::new(account_id, database_id, api_token)
326                    .map(|b| Box::new(b) as Box<dyn StorageBackend>)
327                    .map_err(|e| {
328                        IncrementalError::InitializationFailed(format!("D1 init failed: {}", e))
329                    })
330            }
331            #[cfg(not(feature = "d1-backend"))]
332            {
333                let _ = (account_id, database_id, api_token); // Suppress unused warnings
334                Err(IncrementalError::UnsupportedBackend("d1"))
335            }
336        }
337
338        // ── InMemory Backend ──────────────────────────────────────────────
339        (BackendType::InMemory, BackendConfig::InMemory) => {
340            Ok(Box::new(InMemoryStorage::new()) as Box<dyn StorageBackend>)
341        }
342
343        // ── Configuration Mismatch ────────────────────────────────────────
344        _ => Err(IncrementalError::InitializationFailed(
345            "Backend type and configuration mismatch".to_string(),
346        )),
347    }
348}
349
350// ─── Tests ────────────────────────────────────────────────────────────────────
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355
356    #[tokio::test]
357    async fn test_create_in_memory_backend() {
358        let result = create_backend(BackendType::InMemory, BackendConfig::InMemory).await;
359        assert!(result.is_ok());
360    }
361
362    #[tokio::test]
363    async fn test_configuration_mismatch() {
364        let result = create_backend(
365            BackendType::InMemory,
366            BackendConfig::Postgres {
367                database_url: "test".to_string(),
368            },
369        )
370        .await;
371        assert!(result.is_err());
372        if let Err(err) = result {
373            assert!(matches!(err, IncrementalError::InitializationFailed(_)));
374        }
375    }
376
377    #[cfg(not(feature = "postgres-backend"))]
378    #[tokio::test]
379    async fn test_postgres_backend_unavailable() {
380        let result = create_backend(
381            BackendType::Postgres,
382            BackendConfig::Postgres {
383                database_url: "postgresql://localhost/test".to_string(),
384            },
385        )
386        .await;
387        assert!(result.is_err());
388        if let Err(err) = result {
389            assert!(matches!(
390                err,
391                IncrementalError::UnsupportedBackend("postgres")
392            ));
393        }
394    }
395
396    #[cfg(not(feature = "d1-backend"))]
397    #[tokio::test]
398    async fn test_d1_backend_unavailable() {
399        let result = create_backend(
400            BackendType::D1,
401            BackendConfig::D1 {
402                account_id: "test".to_string(),
403                database_id: "test".to_string(),
404                api_token: "test".to_string(),
405            },
406        )
407        .await;
408        assert!(result.is_err());
409        if let Err(err) = result {
410            assert!(matches!(err, IncrementalError::UnsupportedBackend("d1")));
411        }
412    }
413
414    #[test]
415    fn test_incremental_error_display() {
416        let err = IncrementalError::UnsupportedBackend("test");
417        assert!(format!("{}", err).contains("not available"));
418
419        let err = IncrementalError::InitializationFailed("connection failed".to_string());
420        assert!(format!("{}", err).contains("connection failed"));
421    }
422
423    #[test]
424    fn test_backend_type_equality() {
425        assert_eq!(BackendType::InMemory, BackendType::InMemory);
426        assert_ne!(BackendType::Postgres, BackendType::D1);
427    }
428}