orchflow_core/state/
mod.rs1pub mod types;
2
3pub use types::{PaneState, SessionState, StateEvent};
4
5use crate::error::Result;
6use crate::storage::StateStore;
7use chrono::Utc;
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{broadcast, RwLock};
11use uuid::Uuid;
12
13pub struct StateManager {
14 store: Arc<dyn StateStore>,
15 event_tx: broadcast::Sender<StateEvent>,
16
17 sessions: Arc<RwLock<HashMap<String, SessionState>>>,
19 panes: Arc<RwLock<HashMap<String, PaneState>>>,
20}
21
22impl StateManager {
23 pub fn new(store: Arc<dyn StateStore>) -> Self {
24 let (event_tx, _) = broadcast::channel(1024);
25
26 Self {
27 store,
28 event_tx,
29 sessions: Arc::new(RwLock::new(HashMap::new())),
30 panes: Arc::new(RwLock::new(HashMap::new())),
31 }
32 }
33
34 pub fn subscribe(&self) -> broadcast::Receiver<StateEvent> {
36 self.event_tx.subscribe()
37 }
38
39 pub async fn create_session(&self, name: String) -> Result<SessionState> {
42 let session = SessionState {
43 id: Uuid::new_v4().to_string(),
44 name,
45 created_at: Utc::now(),
46 updated_at: Utc::now(),
47 pane_ids: Vec::new(),
48 active_pane_id: None,
49 layout: None,
50 metadata: HashMap::new(),
51 };
52
53 let key = format!("session:{}", session.id);
55 self.store
56 .set(
57 &key,
58 serde_json::to_value(&session)
59 .map_err(crate::error::OrchflowError::Serialization)?,
60 )
61 .await
62 .map_err(crate::error::OrchflowError::State)?;
63
64 let mut sessions = self.sessions.write().await;
66 sessions.insert(session.id.clone(), session.clone());
67
68 let _ = self.event_tx.send(StateEvent::SessionCreated {
70 session: session.clone(),
71 });
72
73 Ok(session)
74 }
75
76 pub async fn get_session(&self, session_id: &str) -> Option<SessionState> {
77 let sessions = self.sessions.read().await;
78 sessions.get(session_id).cloned()
79 }
80
81 pub async fn list_sessions(&self) -> Vec<SessionState> {
82 let sessions = self.sessions.read().await;
83 sessions.values().cloned().collect()
84 }
85
86 pub async fn update_session(&self, session: SessionState) -> Result<()> {
87 let key = format!("session:{}", session.id);
89 self.store
90 .set(
91 &key,
92 serde_json::to_value(&session)
93 .map_err(crate::error::OrchflowError::Serialization)?,
94 )
95 .await
96 .map_err(crate::error::OrchflowError::State)?;
97
98 let mut sessions = self.sessions.write().await;
100 sessions.insert(session.id.clone(), session.clone());
101
102 let _ = self.event_tx.send(StateEvent::SessionUpdated { session });
104
105 Ok(())
106 }
107
108 pub async fn delete_session(&self, session_id: &str) -> Result<()> {
109 let key = format!("session:{session_id}");
111 self.store
112 .delete(&key)
113 .await
114 .map_err(crate::error::OrchflowError::State)?;
115
116 let mut sessions = self.sessions.write().await;
118 sessions.remove(session_id);
119
120 let _ = self.event_tx.send(StateEvent::SessionDeleted {
122 session_id: session_id.to_string(),
123 });
124
125 Ok(())
126 }
127
128 pub async fn create_pane(&self, pane: PaneState) -> Result<PaneState> {
131 let key = format!("pane:{}", pane.id);
133 self.store
134 .set(
135 &key,
136 serde_json::to_value(&pane).map_err(crate::error::OrchflowError::Serialization)?,
137 )
138 .await
139 .map_err(crate::error::OrchflowError::State)?;
140
141 let mut panes = self.panes.write().await;
143 panes.insert(pane.id.clone(), pane.clone());
144
145 if let Some(mut session) = self.get_session(&pane.session_id).await {
147 session.pane_ids.push(pane.id.clone());
148 session.updated_at = Utc::now();
149 let _ = self.update_session(session).await;
150 }
151
152 let _ = self
154 .event_tx
155 .send(StateEvent::PaneCreated { pane: pane.clone() });
156
157 Ok(pane)
158 }
159
160 pub async fn get_pane(&self, pane_id: &str) -> Option<PaneState> {
161 let panes = self.panes.read().await;
162 panes.get(pane_id).cloned()
163 }
164
165 pub async fn update_pane(&self, pane: PaneState) -> Result<()> {
166 let key = format!("pane:{}", pane.id);
168 self.store
169 .set(
170 &key,
171 serde_json::to_value(&pane).map_err(crate::error::OrchflowError::Serialization)?,
172 )
173 .await
174 .map_err(crate::error::OrchflowError::State)?;
175
176 let mut panes = self.panes.write().await;
178 panes.insert(pane.id.clone(), pane.clone());
179
180 let _ = self.event_tx.send(StateEvent::PaneUpdated { pane });
182
183 Ok(())
184 }
185
186 pub async fn delete_pane(&self, pane_id: &str) -> Result<()> {
187 let pane = {
189 let panes = self.panes.read().await;
190 panes.get(pane_id).cloned()
191 };
192
193 if let Some(pane) = pane {
194 let key = format!("pane:{pane_id}");
196 self.store
197 .delete(&key)
198 .await
199 .map_err(crate::error::OrchflowError::State)?;
200
201 let mut panes = self.panes.write().await;
203 panes.remove(pane_id);
204
205 if let Some(mut session) = self.get_session(&pane.session_id).await {
207 session.pane_ids.retain(|id| id != pane_id);
208 session.updated_at = Utc::now();
209 let _ = self.update_session(session).await;
210 }
211
212 let _ = self.event_tx.send(StateEvent::PaneDeleted {
214 pane_id: pane_id.to_string(),
215 });
216 }
217
218 Ok(())
219 }
220
221 pub async fn delete_panes_for_session(&self, session_id: &str) -> Result<()> {
222 let pane_ids: Vec<String> = {
223 let panes = self.panes.read().await;
224 panes
225 .values()
226 .filter(|p| p.session_id == session_id)
227 .map(|p| p.id.clone())
228 .collect()
229 };
230
231 for pane_id in pane_ids {
232 let _ = self.delete_pane(&pane_id).await;
233 }
234
235 Ok(())
236 }
237
238 pub async fn load_from_storage(&self) -> Result<()> {
240 let session_keys = self
242 .store
243 .list_keys(Some("session:"))
244 .await
245 .map_err(crate::error::OrchflowError::State)?;
246
247 for key in session_keys {
248 if let Ok(Some(value)) = self.store.get(&key).await {
249 if let Ok(session) = serde_json::from_value::<SessionState>(value) {
250 let mut sessions = self.sessions.write().await;
251 sessions.insert(session.id.clone(), session);
252 }
253 }
254 }
255
256 let pane_keys = self
258 .store
259 .list_keys(Some("pane:"))
260 .await
261 .map_err(crate::error::OrchflowError::State)?;
262
263 for key in pane_keys {
264 if let Ok(Some(value)) = self.store.get(&key).await {
265 if let Ok(pane) = serde_json::from_value::<PaneState>(value) {
266 let mut panes = self.panes.write().await;
267 panes.insert(pane.id.clone(), pane);
268 }
269 }
270 }
271
272 Ok(())
273 }
274}
275
276impl Clone for StateManager {
277 fn clone(&self) -> Self {
278 Self {
279 store: self.store.clone(),
280 event_tx: self.event_tx.clone(),
281 sessions: self.sessions.clone(),
282 panes: self.panes.clone(),
283 }
284 }
285}