orchflow_core/state/
mod.rs

1pub mod types;
2
3pub use types::{PaneState, SessionState, StateEvent};
4
5use crate::error::Result;
6use crate::storage::StateStore;
7use chrono::Utc;
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{broadcast, RwLock};
11use uuid::Uuid;
12
13pub struct StateManager {
14    store: Arc<dyn StateStore>,
15    event_tx: broadcast::Sender<StateEvent>,
16
17    // In-memory caches
18    sessions: Arc<RwLock<HashMap<String, SessionState>>>,
19    panes: Arc<RwLock<HashMap<String, PaneState>>>,
20}
21
22impl StateManager {
23    pub fn new(store: Arc<dyn StateStore>) -> Self {
24        let (event_tx, _) = broadcast::channel(1024);
25
26        Self {
27            store,
28            event_tx,
29            sessions: Arc::new(RwLock::new(HashMap::new())),
30            panes: Arc::new(RwLock::new(HashMap::new())),
31        }
32    }
33
34    /// Subscribe to state events
35    pub fn subscribe(&self) -> broadcast::Receiver<StateEvent> {
36        self.event_tx.subscribe()
37    }
38
39    // Session management
40
41    pub async fn create_session(&self, name: String) -> Result<SessionState> {
42        let session = SessionState {
43            id: Uuid::new_v4().to_string(),
44            name,
45            created_at: Utc::now(),
46            updated_at: Utc::now(),
47            pane_ids: Vec::new(),
48            active_pane_id: None,
49            layout: None,
50            metadata: HashMap::new(),
51        };
52
53        // Save to store
54        let key = format!("session:{}", session.id);
55        self.store
56            .set(
57                &key,
58                serde_json::to_value(&session)
59                    .map_err(crate::error::OrchflowError::Serialization)?,
60            )
61            .await
62            .map_err(crate::error::OrchflowError::State)?;
63
64        // Update cache
65        let mut sessions = self.sessions.write().await;
66        sessions.insert(session.id.clone(), session.clone());
67
68        // Emit event
69        let _ = self.event_tx.send(StateEvent::SessionCreated {
70            session: session.clone(),
71        });
72
73        Ok(session)
74    }
75
76    pub async fn get_session(&self, session_id: &str) -> Option<SessionState> {
77        let sessions = self.sessions.read().await;
78        sessions.get(session_id).cloned()
79    }
80
81    pub async fn list_sessions(&self) -> Vec<SessionState> {
82        let sessions = self.sessions.read().await;
83        sessions.values().cloned().collect()
84    }
85
86    pub async fn update_session(&self, session: SessionState) -> Result<()> {
87        // Save to store
88        let key = format!("session:{}", session.id);
89        self.store
90            .set(
91                &key,
92                serde_json::to_value(&session)
93                    .map_err(crate::error::OrchflowError::Serialization)?,
94            )
95            .await
96            .map_err(crate::error::OrchflowError::State)?;
97
98        // Update cache
99        let mut sessions = self.sessions.write().await;
100        sessions.insert(session.id.clone(), session.clone());
101
102        // Emit event
103        let _ = self.event_tx.send(StateEvent::SessionUpdated { session });
104
105        Ok(())
106    }
107
108    pub async fn delete_session(&self, session_id: &str) -> Result<()> {
109        // Delete from store
110        let key = format!("session:{session_id}");
111        self.store
112            .delete(&key)
113            .await
114            .map_err(crate::error::OrchflowError::State)?;
115
116        // Remove from cache
117        let mut sessions = self.sessions.write().await;
118        sessions.remove(session_id);
119
120        // Emit event
121        let _ = self.event_tx.send(StateEvent::SessionDeleted {
122            session_id: session_id.to_string(),
123        });
124
125        Ok(())
126    }
127
128    // Pane management
129
130    pub async fn create_pane(&self, pane: PaneState) -> Result<PaneState> {
131        // Save to store
132        let key = format!("pane:{}", pane.id);
133        self.store
134            .set(
135                &key,
136                serde_json::to_value(&pane).map_err(crate::error::OrchflowError::Serialization)?,
137            )
138            .await
139            .map_err(crate::error::OrchflowError::State)?;
140
141        // Update cache
142        let mut panes = self.panes.write().await;
143        panes.insert(pane.id.clone(), pane.clone());
144
145        // Update session
146        if let Some(mut session) = self.get_session(&pane.session_id).await {
147            session.pane_ids.push(pane.id.clone());
148            session.updated_at = Utc::now();
149            let _ = self.update_session(session).await;
150        }
151
152        // Emit event
153        let _ = self
154            .event_tx
155            .send(StateEvent::PaneCreated { pane: pane.clone() });
156
157        Ok(pane)
158    }
159
160    pub async fn get_pane(&self, pane_id: &str) -> Option<PaneState> {
161        let panes = self.panes.read().await;
162        panes.get(pane_id).cloned()
163    }
164
165    pub async fn update_pane(&self, pane: PaneState) -> Result<()> {
166        // Save to store
167        let key = format!("pane:{}", pane.id);
168        self.store
169            .set(
170                &key,
171                serde_json::to_value(&pane).map_err(crate::error::OrchflowError::Serialization)?,
172            )
173            .await
174            .map_err(crate::error::OrchflowError::State)?;
175
176        // Update cache
177        let mut panes = self.panes.write().await;
178        panes.insert(pane.id.clone(), pane.clone());
179
180        // Emit event
181        let _ = self.event_tx.send(StateEvent::PaneUpdated { pane });
182
183        Ok(())
184    }
185
186    pub async fn delete_pane(&self, pane_id: &str) -> Result<()> {
187        // Get pane to find session
188        let pane = {
189            let panes = self.panes.read().await;
190            panes.get(pane_id).cloned()
191        };
192
193        if let Some(pane) = pane {
194            // Delete from store
195            let key = format!("pane:{pane_id}");
196            self.store
197                .delete(&key)
198                .await
199                .map_err(crate::error::OrchflowError::State)?;
200
201            // Remove from cache
202            let mut panes = self.panes.write().await;
203            panes.remove(pane_id);
204
205            // Update session
206            if let Some(mut session) = self.get_session(&pane.session_id).await {
207                session.pane_ids.retain(|id| id != pane_id);
208                session.updated_at = Utc::now();
209                let _ = self.update_session(session).await;
210            }
211
212            // Emit event
213            let _ = self.event_tx.send(StateEvent::PaneDeleted {
214                pane_id: pane_id.to_string(),
215            });
216        }
217
218        Ok(())
219    }
220
221    pub async fn delete_panes_for_session(&self, session_id: &str) -> Result<()> {
222        let pane_ids: Vec<String> = {
223            let panes = self.panes.read().await;
224            panes
225                .values()
226                .filter(|p| p.session_id == session_id)
227                .map(|p| p.id.clone())
228                .collect()
229        };
230
231        for pane_id in pane_ids {
232            let _ = self.delete_pane(&pane_id).await;
233        }
234
235        Ok(())
236    }
237
238    /// Load state from storage on startup
239    pub async fn load_from_storage(&self) -> Result<()> {
240        // Load sessions
241        let session_keys = self
242            .store
243            .list_keys(Some("session:"))
244            .await
245            .map_err(crate::error::OrchflowError::State)?;
246
247        for key in session_keys {
248            if let Ok(Some(value)) = self.store.get(&key).await {
249                if let Ok(session) = serde_json::from_value::<SessionState>(value) {
250                    let mut sessions = self.sessions.write().await;
251                    sessions.insert(session.id.clone(), session);
252                }
253            }
254        }
255
256        // Load panes
257        let pane_keys = self
258            .store
259            .list_keys(Some("pane:"))
260            .await
261            .map_err(crate::error::OrchflowError::State)?;
262
263        for key in pane_keys {
264            if let Ok(Some(value)) = self.store.get(&key).await {
265                if let Ok(pane) = serde_json::from_value::<PaneState>(value) {
266                    let mut panes = self.panes.write().await;
267                    panes.insert(pane.id.clone(), pane);
268                }
269            }
270        }
271
272        Ok(())
273    }
274}
275
276impl Clone for StateManager {
277    fn clone(&self) -> Self {
278        Self {
279            store: self.store.clone(),
280            event_tx: self.event_tx.clone(),
281            sessions: self.sessions.clone(),
282            panes: self.panes.clone(),
283        }
284    }
285}