Skip to main content

ironflow_api/
state.rs

1//! Application state and dependency injection.
2//!
3//! [`AppState`] holds the shared [`RunStore`] and [`Engine`] used by all handlers.
4
5use std::sync::Arc;
6#[cfg(feature = "prometheus")]
7use std::sync::OnceLock;
8
9use axum::extract::FromRef;
10#[cfg(feature = "prometheus")]
11use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
12use tokio::sync::broadcast;
13use uuid::Uuid;
14
15use ironflow_auth::jwt::JwtConfig;
16use ironflow_engine::engine::Engine;
17use ironflow_engine::notify::Event;
18use ironflow_store::api_key_store::ApiKeyStore;
19use ironflow_store::entities::Run;
20use ironflow_store::store::RunStore;
21use ironflow_store::user_store::UserStore;
22
23use crate::error::ApiError;
24
25/// Global application state.
26///
27/// Holds the shared run store and engine, extracted by handlers using Axum's
28/// state extraction mechanism.
29///
30/// # Examples
31///
32/// ```no_run
33/// use ironflow_api::state::AppState;
34/// use ironflow_auth::jwt::JwtConfig;
35/// use ironflow_store::prelude::*;
36/// use ironflow_store::api_key_store::ApiKeyStore;
37/// use ironflow_engine::engine::Engine;
38/// use ironflow_core::providers::claude::ClaudeCodeProvider;
39/// use std::sync::Arc;
40///
41/// # async fn example() {
42/// let store = Arc::new(InMemoryStore::new());
43/// let user_store: Arc<dyn UserStore> = Arc::new(InMemoryStore::new());
44/// let api_key_store: Arc<dyn ApiKeyStore> = Arc::new(InMemoryStore::new());
45/// let provider = Arc::new(ClaudeCodeProvider::new());
46/// let engine = Arc::new(Engine::new(store.clone(), provider));
47/// let jwt_config = Arc::new(JwtConfig {
48///     secret: "secret".to_string(),
49///     access_token_ttl_secs: 900,
50///     refresh_token_ttl_secs: 604800,
51///     cookie_domain: None,
52///     cookie_secure: false,
53/// });
54/// let broadcaster = ironflow_api::sse::SseBroadcaster::new();
55/// let state = AppState::new(store, user_store, api_key_store, engine, jwt_config, "token".to_string(), broadcaster.sender());
56/// # }
57/// ```
58#[derive(Clone)]
59pub struct AppState {
60    /// The backing store for runs and steps.
61    pub store: Arc<dyn RunStore>,
62    /// The backing store for users.
63    pub user_store: Arc<dyn UserStore>,
64    /// The backing store for API keys.
65    pub api_key_store: Arc<dyn ApiKeyStore>,
66    /// The workflow orchestration engine.
67    pub engine: Arc<Engine>,
68    /// JWT configuration for auth tokens.
69    pub jwt_config: Arc<JwtConfig>,
70    /// Static token for worker-to-API authentication.
71    pub worker_token: String,
72    /// Broadcast sender for SSE event streaming.
73    pub event_sender: broadcast::Sender<Event>,
74    /// Prometheus metrics handle (only when `prometheus` feature is enabled).
75    #[cfg(feature = "prometheus")]
76    pub prometheus_handle: PrometheusHandle,
77}
78
79impl FromRef<AppState> for Arc<dyn RunStore> {
80    fn from_ref(state: &AppState) -> Self {
81        Arc::clone(&state.store)
82    }
83}
84
85impl FromRef<AppState> for Arc<dyn UserStore> {
86    fn from_ref(state: &AppState) -> Self {
87        Arc::clone(&state.user_store)
88    }
89}
90
91impl FromRef<AppState> for Arc<dyn ApiKeyStore> {
92    fn from_ref(state: &AppState) -> Self {
93        Arc::clone(&state.api_key_store)
94    }
95}
96
97impl FromRef<AppState> for Arc<JwtConfig> {
98    fn from_ref(state: &AppState) -> Self {
99        Arc::clone(&state.jwt_config)
100    }
101}
102
103#[cfg(feature = "prometheus")]
104impl FromRef<AppState> for PrometheusHandle {
105    fn from_ref(state: &AppState) -> Self {
106        state.prometheus_handle.clone()
107    }
108}
109
110impl AppState {
111    /// Fetch a run by ID or return 404.
112    ///
113    /// # Errors
114    ///
115    /// Returns `ApiError::RunNotFound` if the run does not exist.
116    /// Returns `ApiError::Store` if there is a store error.
117    /// Create a new `AppState`.
118    ///
119    /// When the `prometheus` feature is enabled, a global Prometheus recorder
120    /// is installed (once) and its handle is stored in the state.
121    ///
122    /// # Panics
123    ///
124    /// Panics if a Prometheus recorder cannot be installed (should only
125    /// happen if another incompatible recorder was set elsewhere).
126    pub fn new(
127        store: Arc<dyn RunStore>,
128        user_store: Arc<dyn UserStore>,
129        api_key_store: Arc<dyn ApiKeyStore>,
130        engine: Arc<Engine>,
131        jwt_config: Arc<JwtConfig>,
132        worker_token: String,
133        event_sender: broadcast::Sender<Event>,
134    ) -> Self {
135        Self {
136            store,
137            user_store,
138            api_key_store,
139            engine,
140            jwt_config,
141            worker_token,
142            event_sender,
143            #[cfg(feature = "prometheus")]
144            prometheus_handle: Self::global_prometheus_handle(),
145        }
146    }
147
148    /// Install (or reuse) a global Prometheus recorder and return its handle.
149    #[cfg(feature = "prometheus")]
150    fn global_prometheus_handle() -> PrometheusHandle {
151        static HANDLE: OnceLock<PrometheusHandle> = OnceLock::new();
152        HANDLE
153            .get_or_init(|| {
154                PrometheusBuilder::new()
155                    .install_recorder()
156                    .expect("failed to install Prometheus recorder")
157            })
158            .clone()
159    }
160
161    /// Fetch a run by ID or return 404.
162    ///
163    /// # Errors
164    ///
165    /// Returns `ApiError::RunNotFound` if the run does not exist.
166    /// Returns `ApiError::Store` if there is a store error.
167    pub async fn get_run_or_404(&self, id: Uuid) -> Result<Run, ApiError> {
168        self.store
169            .get_run(id)
170            .await
171            .map_err(ApiError::from)?
172            .ok_or(ApiError::RunNotFound(id))
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use ironflow_core::providers::claude::ClaudeCodeProvider;
180    use ironflow_store::memory::InMemoryStore;
181
182    fn test_state() -> AppState {
183        let store = Arc::new(InMemoryStore::new());
184        let user_store: Arc<dyn UserStore> = Arc::new(InMemoryStore::new());
185        let api_key_store: Arc<dyn ApiKeyStore> = Arc::new(InMemoryStore::new());
186        let provider = Arc::new(ClaudeCodeProvider::new());
187        let engine = Arc::new(Engine::new(store.clone(), provider));
188        let jwt_config = Arc::new(JwtConfig {
189            secret: "test-secret".to_string(),
190            access_token_ttl_secs: 900,
191            refresh_token_ttl_secs: 604800,
192            cookie_domain: None,
193            cookie_secure: false,
194        });
195        let (event_sender, _) = broadcast::channel::<Event>(1);
196        AppState::new(
197            store,
198            user_store,
199            api_key_store,
200            engine,
201            jwt_config,
202            "test-worker-token".to_string(),
203            event_sender,
204        )
205    }
206
207    #[test]
208    fn app_state_cloneable() {
209        let state = test_state();
210        let _cloned = state.clone();
211    }
212
213    #[test]
214    fn app_state_from_ref() {
215        let state = test_state();
216        let extracted: Arc<dyn RunStore> = Arc::from_ref(&state);
217        assert!(Arc::ptr_eq(&extracted, &state.store));
218    }
219}