orchflow_core/manager/
mod.rs

1pub mod actions;
2pub mod events;
3pub mod execution;
4pub mod handlers;
5pub mod plugins;
6
7pub use actions::{Action, PaneType, ShellType};
8pub use events::Event;
9pub use plugins::{Plugin, PluginContext, PluginInfo, PluginMetadata};
10
11use crate::backend::MuxBackend;
12use crate::error::Result;
13use crate::state::StateManager;
14use serde_json::Value;
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::sync::{broadcast, mpsc, RwLock};
18
19// Type alias for complex plugin storage
20pub type PluginMap = HashMap<String, Arc<tokio::sync::Mutex<Box<dyn Plugin>>>>;
21
22/// Builder for Manager with optional services
23pub struct ManagerBuilder {
24    mux_backend: Arc<dyn MuxBackend>,
25    state_manager: StateManager,
26    file_manager: Option<Arc<dyn FileManager>>,
27    search_provider: Option<Arc<dyn SearchProvider>>,
28    command_history: Option<Arc<dyn CommandHistory>>,
29}
30
31impl ManagerBuilder {
32    /// Create a new builder
33    pub fn new(mux_backend: Arc<dyn MuxBackend>, state_manager: StateManager) -> Self {
34        Self {
35            mux_backend,
36            state_manager,
37            file_manager: None,
38            search_provider: None,
39            command_history: None,
40        }
41    }
42
43    /// Set file manager
44    pub fn with_file_manager(mut self, file_manager: Arc<dyn FileManager>) -> Self {
45        self.file_manager = Some(file_manager);
46        self
47    }
48
49    /// Set search provider
50    pub fn with_search_provider(mut self, search_provider: Arc<dyn SearchProvider>) -> Self {
51        self.search_provider = Some(search_provider);
52        self
53    }
54
55    /// Set command history
56    pub fn with_command_history(mut self, command_history: Arc<dyn CommandHistory>) -> Self {
57        self.command_history = Some(command_history);
58        self
59    }
60
61    /// Build the Manager and start background tasks
62    pub fn build(self) -> Manager {
63        let (event_tx, _) = broadcast::channel(1024);
64        let (action_tx, mut action_rx) = mpsc::channel(100);
65
66        let manager = Manager {
67            state_manager: self.state_manager,
68            mux_backend: self.mux_backend,
69            plugins: Arc::new(RwLock::new(HashMap::new())),
70            plugin_subscriptions: Arc::new(RwLock::new(HashMap::new())),
71            event_tx: event_tx.clone(),
72            action_tx: action_tx.clone(),
73            file_manager: self.file_manager,
74            search_provider: self.search_provider,
75            command_history: self.command_history,
76        };
77
78        // Start action processor
79        let manager_clone = manager.clone();
80        tokio::spawn(async move {
81            while let Some((action, reply_tx)) = action_rx.recv().await {
82                let result = manager_clone.process_action(action).await;
83                let _ = reply_tx.send(result).await;
84            }
85        });
86
87        // Bridge state events to manager events
88        let state_event_rx = manager.state_manager.subscribe();
89        let manager_clone = manager.clone();
90        tokio::spawn(async move {
91            let mut rx = state_event_rx;
92            while let Ok(state_event) = rx.recv().await {
93                manager_clone.handle_state_event(state_event).await;
94            }
95        });
96
97        // Start plugin event dispatcher
98        let mut event_rx = event_tx.subscribe();
99        let manager_clone = manager.clone();
100        tokio::spawn(async move {
101            while let Ok(event) = event_rx.recv().await {
102                manager_clone.dispatch_event_to_plugins(&event).await;
103            }
104        });
105
106        manager
107    }
108}
109
110/// Core Manager - Transport-agnostic orchestration engine
111pub struct Manager {
112    // Unified state management
113    pub state_manager: StateManager,
114
115    // Backend infrastructure
116    pub mux_backend: Arc<dyn MuxBackend>,
117
118    // Plugin system
119    pub(crate) plugins: Arc<RwLock<PluginMap>>,
120    pub(crate) plugin_subscriptions: Arc<RwLock<HashMap<String, Vec<String>>>>,
121
122    // Event system
123    pub event_tx: broadcast::Sender<Event>,
124    pub(crate) action_tx: mpsc::Sender<(Action, mpsc::Sender<Result<Value>>)>,
125
126    // Optional services (injected via traits)
127    pub file_manager: Option<Arc<dyn FileManager>>,
128    pub search_provider: Option<Arc<dyn SearchProvider>>,
129    pub command_history: Option<Arc<dyn CommandHistory>>,
130}
131
132/// Trait for file management operations
133#[async_trait::async_trait]
134pub trait FileManager: Send + Sync {
135    async fn create_file(&self, path: &str, content: Option<&str>) -> Result<()>;
136    async fn read_file(&self, path: &str) -> Result<String>;
137    async fn delete_file(&self, path: &str) -> Result<()>;
138    async fn rename_file(&self, old_path: &str, new_path: &str) -> Result<()>;
139    async fn copy_file(&self, source: &str, destination: &str) -> Result<()>;
140    async fn move_file(&self, source: &str, destination: &str) -> Result<()>;
141    async fn create_directory(&self, path: &str) -> Result<()>;
142    async fn list_directory(&self, path: &str) -> Result<Vec<String>>;
143}
144
145/// Trait for search operations
146#[async_trait::async_trait]
147pub trait SearchProvider: Send + Sync {
148    async fn search_project(&self, pattern: &str, options: Value) -> Result<Value>;
149    async fn search_in_file(&self, file_path: &str, pattern: &str) -> Result<Value>;
150}
151
152/// Trait for command history
153#[async_trait::async_trait]
154pub trait CommandHistory: Send + Sync {
155    async fn add_command(&self, session_id: &str, command: &str) -> Result<()>;
156    async fn get_history(&self, session_id: &str, limit: usize) -> Result<Vec<String>>;
157    async fn search_history(&self, pattern: &str) -> Result<Vec<String>>;
158}
159
160impl Manager {
161    /// Create new manager with a MuxBackend and StateManager
162    /// For advanced configuration, use ManagerBuilder instead
163    pub fn new(mux_backend: Arc<dyn MuxBackend>, state_manager: StateManager) -> Self {
164        ManagerBuilder::new(mux_backend, state_manager).build()
165    }
166
167    /// Create a builder for advanced configuration
168    pub fn builder(
169        mux_backend: Arc<dyn MuxBackend>,
170        state_manager: StateManager,
171    ) -> ManagerBuilder {
172        ManagerBuilder::new(mux_backend, state_manager)
173    }
174
175    /// Get a reference to the file manager if available
176    pub fn file_manager(&self) -> Option<&Arc<dyn FileManager>> {
177        self.file_manager.as_ref()
178    }
179
180    /// Get a reference to the search provider if available
181    pub fn search_provider(&self) -> Option<&Arc<dyn SearchProvider>> {
182        self.search_provider.as_ref()
183    }
184
185    /// Get a reference to the command history if available
186    pub fn command_history(&self) -> Option<&Arc<dyn CommandHistory>> {
187        self.command_history.as_ref()
188    }
189
190    /// Execute an action
191    pub async fn execute_action(&self, action: Action) -> Result<Value> {
192        execution::execute_action(self, action).await
193    }
194
195    // Internal action processor
196    pub(crate) async fn process_action(&self, action: Action) -> Result<Value> {
197        execution::process_action(self, action).await
198    }
199
200    /// Subscribe a plugin to specific event types
201    pub async fn subscribe_plugin(&self, plugin_id: &str, events: Vec<String>) -> Result<()> {
202        let mut subscriptions = self.plugin_subscriptions.write().await;
203        subscriptions.insert(plugin_id.to_string(), events);
204        Ok(())
205    }
206
207    /// Emit an event
208    pub fn emit_event(&self, event: Event) {
209        let _ = self.event_tx.send(event);
210    }
211
212    /// Load a plugin
213    pub async fn load_plugin(&self, mut plugin: Box<dyn Plugin>) -> Result<()> {
214        let plugin_id = plugin.id().to_string();
215        let context = PluginContext {
216            manager: Arc::new(self.clone()),
217            plugin_id: plugin_id.clone(),
218        };
219
220        // Initialize plugin
221        plugin
222            .init(context)
223            .await
224            .map_err(crate::error::OrchflowError::Plugin)?;
225
226        // Store plugin
227        let mut plugins = self.plugins.write().await;
228        plugins.insert(plugin_id.clone(), Arc::new(tokio::sync::Mutex::new(plugin)));
229
230        // Emit event
231        self.emit_event(Event::PluginLoaded { id: plugin_id });
232
233        Ok(())
234    }
235
236    /// Unload a plugin
237    pub async fn unload_plugin(&self, plugin_id: &str) -> Result<()> {
238        let mut plugins = self.plugins.write().await;
239        if let Some(plugin) = plugins.remove(plugin_id) {
240            // Shutdown the plugin
241            let mut plugin_lock = plugin.lock().await;
242            plugin_lock
243                .shutdown()
244                .await
245                .map_err(crate::error::OrchflowError::Plugin)?;
246
247            // Remove plugin subscriptions
248            let mut subscriptions = self.plugin_subscriptions.write().await;
249            subscriptions.remove(plugin_id);
250
251            self.emit_event(Event::PluginUnloaded {
252                id: plugin_id.to_string(),
253            });
254            Ok(())
255        } else {
256            Err(crate::error::OrchflowError::NotFound(format!(
257                "Plugin not loaded: {plugin_id}"
258            )))
259        }
260    }
261
262    // Handle state events from StateManager
263    async fn handle_state_event(&self, event: crate::state::StateEvent) {
264        use crate::state::StateEvent;
265
266        match event {
267            StateEvent::SessionCreated { session } => {
268                self.emit_event(Event::SessionCreated { session });
269            }
270            StateEvent::SessionUpdated { session } => {
271                self.emit_event(Event::SessionUpdated { session });
272            }
273            StateEvent::SessionDeleted { session_id } => {
274                self.emit_event(Event::SessionDeleted { session_id });
275            }
276            StateEvent::PaneCreated { pane } => {
277                self.emit_event(Event::PaneCreated { pane });
278            }
279            _ => {} // Handle other state events as needed
280        }
281    }
282
283    /// Dispatch an event to all plugins
284    async fn dispatch_event_to_plugins(&self, event: &Event) {
285        plugins::dispatch_event_to_plugins(self, event).await;
286    }
287}
288
289// Make it cloneable for shared ownership
290impl Clone for Manager {
291    fn clone(&self) -> Self {
292        Self {
293            state_manager: self.state_manager.clone(),
294            mux_backend: self.mux_backend.clone(),
295            plugins: self.plugins.clone(),
296            plugin_subscriptions: self.plugin_subscriptions.clone(),
297            event_tx: self.event_tx.clone(),
298            action_tx: self.action_tx.clone(),
299            file_manager: self.file_manager.clone(),
300            search_provider: self.search_provider.clone(),
301            command_history: self.command_history.clone(),
302        }
303    }
304}