Skip to main content

fresh_plugin_runtime/
thread.rs

1//! Plugin Thread: Dedicated thread for TypeScript plugin execution
2//!
3//! This module implements a dedicated thread architecture for plugin execution,
4//! using QuickJS as the JavaScript runtime with oxc for TypeScript transpilation.
5//!
6//! Architecture:
7//! - Main thread (UI) sends requests to plugin thread via channel
8//! - Plugin thread owns QuickJS runtime and persistent tokio runtime
9//! - Results are sent back via the existing PluginCommand channel
10//! - Async operations complete naturally without runtime destruction
11
12use crate::backend::quickjs_backend::{PendingResponses, TsPluginInfo};
13use crate::backend::QuickJsBackend;
14use anyhow::{anyhow, Result};
15use fresh_core::api::{EditorStateSnapshot, PluginCommand};
16use fresh_core::hooks::HookArgs;
17use std::cell::RefCell;
18use std::collections::HashMap;
19use std::path::{Path, PathBuf};
20use std::rc::Rc;
21use std::sync::{Arc, RwLock};
22use std::thread::{self, JoinHandle};
23use std::time::Duration;
24
25// Re-export PluginConfig from fresh-core
26pub use fresh_core::config::PluginConfig;
27
28/// Request messages sent to the plugin thread
29#[derive(Debug)]
30pub enum PluginRequest {
31    /// Load a plugin from a file
32    LoadPlugin {
33        path: PathBuf,
34        response: oneshot::Sender<Result<()>>,
35    },
36
37    /// Resolve an async callback with a result (for async operations like SpawnProcess, Delay)
38    ResolveCallback {
39        callback_id: fresh_core::api::JsCallbackId,
40        result_json: String,
41    },
42
43    /// Reject an async callback with an error
44    RejectCallback {
45        callback_id: fresh_core::api::JsCallbackId,
46        error: String,
47    },
48
49    /// Load all plugins from a directory
50    LoadPluginsFromDir {
51        dir: PathBuf,
52        response: oneshot::Sender<Vec<String>>,
53    },
54
55    /// Load all plugins from a directory with config support
56    /// Returns (errors, discovered_plugins) where discovered_plugins contains
57    /// all found plugins with their paths and enabled status
58    LoadPluginsFromDirWithConfig {
59        dir: PathBuf,
60        plugin_configs: HashMap<String, PluginConfig>,
61        response: oneshot::Sender<(Vec<String>, HashMap<String, PluginConfig>)>,
62    },
63
64    /// Unload a plugin by name
65    UnloadPlugin {
66        name: String,
67        response: oneshot::Sender<Result<()>>,
68    },
69
70    /// Reload a plugin by name
71    ReloadPlugin {
72        name: String,
73        response: oneshot::Sender<Result<()>>,
74    },
75
76    /// Execute a plugin action
77    ExecuteAction {
78        action_name: String,
79        response: oneshot::Sender<Result<()>>,
80    },
81
82    /// Run a hook (fire-and-forget, no response needed)
83    RunHook { hook_name: String, args: HookArgs },
84
85    /// Check if any handlers are registered for a hook
86    HasHookHandlers {
87        hook_name: String,
88        response: oneshot::Sender<bool>,
89    },
90
91    /// List all loaded plugins
92    ListPlugins {
93        response: oneshot::Sender<Vec<TsPluginInfo>>,
94    },
95
96    /// Shutdown the plugin thread
97    Shutdown,
98}
99
100/// Simple oneshot channel implementation
101pub mod oneshot {
102    use std::fmt;
103    use std::sync::mpsc;
104
105    pub struct Sender<T>(mpsc::SyncSender<T>);
106    pub struct Receiver<T>(mpsc::Receiver<T>);
107
108    use anyhow::Result;
109
110    impl<T> fmt::Debug for Sender<T> {
111        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112            f.debug_tuple("Sender").finish()
113        }
114    }
115
116    impl<T> fmt::Debug for Receiver<T> {
117        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118            f.debug_tuple("Receiver").finish()
119        }
120    }
121
122    impl<T> Sender<T> {
123        pub fn send(self, value: T) -> Result<(), T> {
124            self.0.send(value).map_err(|e| e.0)
125        }
126    }
127
128    impl<T> Receiver<T> {
129        pub fn recv(self) -> Result<T, mpsc::RecvError> {
130            self.0.recv()
131        }
132
133        pub fn recv_timeout(
134            self,
135            timeout: std::time::Duration,
136        ) -> Result<T, mpsc::RecvTimeoutError> {
137            self.0.recv_timeout(timeout)
138        }
139
140        pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
141            self.0.try_recv()
142        }
143    }
144
145    pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
146        let (tx, rx) = mpsc::sync_channel(1);
147        (Sender(tx), Receiver(rx))
148    }
149}
150
151/// Handle to the plugin thread for sending requests
152pub struct PluginThreadHandle {
153    /// Channel to send requests to the plugin thread
154    /// Wrapped in Option so we can drop it to signal shutdown
155    request_sender: Option<tokio::sync::mpsc::UnboundedSender<PluginRequest>>,
156
157    /// Thread join handle
158    thread_handle: Option<JoinHandle<()>>,
159
160    /// State snapshot handle for editor to update
161    state_snapshot: Arc<RwLock<EditorStateSnapshot>>,
162
163    /// Pending response senders for async operations (shared with runtime)
164    pending_responses: PendingResponses,
165
166    /// Receiver for plugin commands (polled by editor directly)
167    command_receiver: std::sync::mpsc::Receiver<PluginCommand>,
168}
169
170impl PluginThreadHandle {
171    /// Create a new plugin thread and return its handle
172    pub fn spawn(services: Arc<dyn fresh_core::services::PluginServiceBridge>) -> Result<Self> {
173        tracing::debug!("PluginThreadHandle::spawn: starting plugin thread creation");
174
175        // Create channel for plugin commands
176        let (command_sender, command_receiver) = std::sync::mpsc::channel();
177
178        // Create editor state snapshot for query API
179        let state_snapshot = Arc::new(RwLock::new(EditorStateSnapshot::new()));
180
181        // Create pending responses map (shared between handle and runtime)
182        let pending_responses: PendingResponses =
183            Arc::new(std::sync::Mutex::new(std::collections::HashMap::new()));
184        let thread_pending_responses = Arc::clone(&pending_responses);
185
186        // Create channel for requests (unbounded allows sync send, async recv)
187        let (request_sender, request_receiver) = tokio::sync::mpsc::unbounded_channel();
188
189        // Clone state snapshot for the thread
190        let thread_state_snapshot = Arc::clone(&state_snapshot);
191
192        // Spawn the plugin thread
193        tracing::debug!("PluginThreadHandle::spawn: spawning OS thread for plugin runtime");
194        let thread_handle = thread::spawn(move || {
195            tracing::debug!("Plugin thread: OS thread started, creating tokio runtime");
196            // Create tokio runtime for the plugin thread
197            let rt = match tokio::runtime::Builder::new_current_thread()
198                .enable_all()
199                .build()
200            {
201                Ok(rt) => {
202                    tracing::debug!("Plugin thread: tokio runtime created successfully");
203                    rt
204                }
205                Err(e) => {
206                    tracing::error!("Failed to create plugin thread runtime: {}", e);
207                    return;
208                }
209            };
210
211            // Create QuickJS runtime with state
212            tracing::debug!("Plugin thread: creating QuickJS runtime");
213            let runtime = match QuickJsBackend::with_state_and_responses(
214                Arc::clone(&thread_state_snapshot),
215                command_sender,
216                thread_pending_responses,
217                services.clone(),
218            ) {
219                Ok(rt) => {
220                    tracing::debug!("Plugin thread: QuickJS runtime created successfully");
221                    rt
222                }
223                Err(e) => {
224                    tracing::error!("Failed to create QuickJS runtime: {}", e);
225                    return;
226                }
227            };
228
229            // Create internal manager state
230            let mut plugins: HashMap<String, TsPluginInfo> = HashMap::new();
231
232            // Run the event loop with a LocalSet to allow concurrent task execution
233            tracing::debug!("Plugin thread: starting event loop with LocalSet");
234            let local = tokio::task::LocalSet::new();
235            local.block_on(&rt, async {
236                // Wrap runtime in RefCell for interior mutability during concurrent operations
237                let runtime = Rc::new(RefCell::new(runtime));
238                tracing::debug!("Plugin thread: entering plugin_thread_loop");
239                plugin_thread_loop(runtime, &mut plugins, request_receiver).await;
240            });
241
242            tracing::info!("Plugin thread shutting down");
243        });
244
245        tracing::debug!("PluginThreadHandle::spawn: OS thread spawned, returning handle");
246        tracing::info!("Plugin thread spawned");
247
248        Ok(Self {
249            request_sender: Some(request_sender),
250            thread_handle: Some(thread_handle),
251            state_snapshot,
252            pending_responses,
253            command_receiver,
254        })
255    }
256
257    /// Check if the plugin thread is still alive
258    pub fn is_alive(&self) -> bool {
259        self.thread_handle
260            .as_ref()
261            .map(|h| !h.is_finished())
262            .unwrap_or(false)
263    }
264
265    /// Check thread health and panic if the plugin thread died due to a panic.
266    /// This propagates plugin thread panics to the calling thread.
267    /// Call this periodically to detect plugin thread failures.
268    pub fn check_thread_health(&mut self) {
269        if let Some(handle) = &self.thread_handle {
270            if handle.is_finished() {
271                tracing::error!(
272                    "check_thread_health: plugin thread is finished, checking for panic"
273                );
274                // Thread finished - take ownership and check result
275                if let Some(handle) = self.thread_handle.take() {
276                    match handle.join() {
277                        Ok(()) => {
278                            tracing::warn!("Plugin thread exited normally (unexpected)");
279                        }
280                        Err(panic_payload) => {
281                            // Re-panic with the original panic message to propagate it
282                            std::panic::resume_unwind(panic_payload);
283                        }
284                    }
285                }
286            }
287        }
288    }
289
290    /// Deliver a response to a pending async operation in the plugin
291    ///
292    /// This is called by the editor after processing a command that requires a response.
293    pub fn deliver_response(&self, response: fresh_core::api::PluginResponse) {
294        // First try to find a pending Rust request (oneshot channel)
295        if respond_to_pending(&self.pending_responses, response.clone()) {
296            return;
297        }
298
299        // If not found, it must be a JS callback
300        use fresh_core::api::{JsCallbackId, PluginResponse};
301
302        match response {
303            PluginResponse::VirtualBufferCreated {
304                request_id,
305                buffer_id,
306                split_id,
307            } => {
308                // Return an object with bufferId and splitId (camelCase for JS)
309                let result = serde_json::json!({
310                    "bufferId": buffer_id.0,
311                    "splitId": split_id.map(|s| s.0)
312                });
313                self.resolve_callback(JsCallbackId(request_id), result.to_string());
314            }
315            PluginResponse::LspRequest { request_id, result } => match result {
316                Ok(value) => {
317                    self.resolve_callback(JsCallbackId(request_id), value.to_string());
318                }
319                Err(e) => {
320                    self.reject_callback(JsCallbackId(request_id), e);
321                }
322            },
323            PluginResponse::HighlightsComputed { request_id, spans } => {
324                let result = serde_json::to_string(&spans).unwrap_or_else(|_| "[]".to_string());
325                self.resolve_callback(JsCallbackId(request_id), result);
326            }
327            PluginResponse::BufferText { request_id, text } => match text {
328                Ok(content) => {
329                    // JSON stringify the content string
330                    let result =
331                        serde_json::to_string(&content).unwrap_or_else(|_| "\"\"".to_string());
332                    self.resolve_callback(JsCallbackId(request_id), result);
333                }
334                Err(e) => {
335                    self.reject_callback(JsCallbackId(request_id), e);
336                }
337            },
338            PluginResponse::CompositeBufferCreated {
339                request_id,
340                buffer_id,
341            } => {
342                // Return just the buffer_id number, not an object
343                self.resolve_callback(JsCallbackId(request_id), buffer_id.0.to_string());
344            }
345            PluginResponse::LineStartPosition {
346                request_id,
347                position,
348            } => {
349                // Return the position as a number or null
350                let result =
351                    serde_json::to_string(&position).unwrap_or_else(|_| "null".to_string());
352                self.resolve_callback(JsCallbackId(request_id), result);
353            }
354            PluginResponse::LineEndPosition {
355                request_id,
356                position,
357            } => {
358                // Return the position as a number or null
359                let result =
360                    serde_json::to_string(&position).unwrap_or_else(|_| "null".to_string());
361                self.resolve_callback(JsCallbackId(request_id), result);
362            }
363            PluginResponse::BufferLineCount { request_id, count } => {
364                // Return the count as a number or null
365                let result = serde_json::to_string(&count).unwrap_or_else(|_| "null".to_string());
366                self.resolve_callback(JsCallbackId(request_id), result);
367            }
368        }
369    }
370
371    /// Load a plugin from a file (blocking)
372    pub fn load_plugin(&self, path: &Path) -> Result<()> {
373        let (tx, rx) = oneshot::channel();
374        self.request_sender
375            .as_ref()
376            .ok_or_else(|| anyhow!("Plugin thread shut down"))?
377            .send(PluginRequest::LoadPlugin {
378                path: path.to_path_buf(),
379                response: tx,
380            })
381            .map_err(|_| anyhow!("Plugin thread not responding"))?;
382
383        rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
384    }
385
386    /// Load all plugins from a directory (blocking)
387    pub fn load_plugins_from_dir(&self, dir: &Path) -> Vec<String> {
388        let (tx, rx) = oneshot::channel();
389        let Some(sender) = self.request_sender.as_ref() else {
390            return vec!["Plugin thread shut down".to_string()];
391        };
392        if sender
393            .send(PluginRequest::LoadPluginsFromDir {
394                dir: dir.to_path_buf(),
395                response: tx,
396            })
397            .is_err()
398        {
399            return vec!["Plugin thread not responding".to_string()];
400        }
401
402        rx.recv()
403            .unwrap_or_else(|_| vec!["Plugin thread closed".to_string()])
404    }
405
406    /// Load all plugins from a directory with config support (blocking)
407    /// Returns (errors, discovered_plugins) where discovered_plugins is a map of
408    /// plugin name -> PluginConfig with paths populated.
409    pub fn load_plugins_from_dir_with_config(
410        &self,
411        dir: &Path,
412        plugin_configs: &HashMap<String, PluginConfig>,
413    ) -> (Vec<String>, HashMap<String, PluginConfig>) {
414        let (tx, rx) = oneshot::channel();
415        let Some(sender) = self.request_sender.as_ref() else {
416            return (vec!["Plugin thread shut down".to_string()], HashMap::new());
417        };
418        if sender
419            .send(PluginRequest::LoadPluginsFromDirWithConfig {
420                dir: dir.to_path_buf(),
421                plugin_configs: plugin_configs.clone(),
422                response: tx,
423            })
424            .is_err()
425        {
426            return (
427                vec!["Plugin thread not responding".to_string()],
428                HashMap::new(),
429            );
430        }
431
432        rx.recv()
433            .unwrap_or_else(|_| (vec!["Plugin thread closed".to_string()], HashMap::new()))
434    }
435
436    /// Unload a plugin (blocking)
437    pub fn unload_plugin(&self, name: &str) -> Result<()> {
438        let (tx, rx) = oneshot::channel();
439        self.request_sender
440            .as_ref()
441            .ok_or_else(|| anyhow!("Plugin thread shut down"))?
442            .send(PluginRequest::UnloadPlugin {
443                name: name.to_string(),
444                response: tx,
445            })
446            .map_err(|_| anyhow!("Plugin thread not responding"))?;
447
448        rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
449    }
450
451    /// Reload a plugin (blocking)
452    pub fn reload_plugin(&self, name: &str) -> Result<()> {
453        let (tx, rx) = oneshot::channel();
454        self.request_sender
455            .as_ref()
456            .ok_or_else(|| anyhow!("Plugin thread shut down"))?
457            .send(PluginRequest::ReloadPlugin {
458                name: name.to_string(),
459                response: tx,
460            })
461            .map_err(|_| anyhow!("Plugin thread not responding"))?;
462
463        rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
464    }
465
466    /// Execute a plugin action (non-blocking)
467    ///
468    /// Returns a receiver that will receive the result when the action completes.
469    /// The caller should poll this while processing commands to avoid deadlock.
470    pub fn execute_action_async(&self, action_name: &str) -> Result<oneshot::Receiver<Result<()>>> {
471        tracing::trace!("execute_action_async: starting action '{}'", action_name);
472        let (tx, rx) = oneshot::channel();
473        self.request_sender
474            .as_ref()
475            .ok_or_else(|| anyhow!("Plugin thread shut down"))?
476            .send(PluginRequest::ExecuteAction {
477                action_name: action_name.to_string(),
478                response: tx,
479            })
480            .map_err(|_| anyhow!("Plugin thread not responding"))?;
481
482        tracing::trace!("execute_action_async: request sent for '{}'", action_name);
483        Ok(rx)
484    }
485
486    /// Run a hook (non-blocking, fire-and-forget)
487    ///
488    /// This is the key improvement: hooks are now non-blocking.
489    /// The plugin thread will execute them asynchronously and
490    /// any results will come back via the PluginCommand channel.
491    pub fn run_hook(&self, hook_name: &str, args: HookArgs) {
492        if let Some(sender) = self.request_sender.as_ref() {
493            let _ = sender.send(PluginRequest::RunHook {
494                hook_name: hook_name.to_string(),
495                args,
496            });
497        }
498    }
499
500    /// Check if any handlers are registered for a hook (blocking)
501    pub fn has_hook_handlers(&self, hook_name: &str) -> bool {
502        let (tx, rx) = oneshot::channel();
503        let Some(sender) = self.request_sender.as_ref() else {
504            return false;
505        };
506        if sender
507            .send(PluginRequest::HasHookHandlers {
508                hook_name: hook_name.to_string(),
509                response: tx,
510            })
511            .is_err()
512        {
513            return false;
514        }
515
516        rx.recv().unwrap_or(false)
517    }
518
519    /// List all loaded plugins (blocking)
520    pub fn list_plugins(&self) -> Vec<TsPluginInfo> {
521        let (tx, rx) = oneshot::channel();
522        let Some(sender) = self.request_sender.as_ref() else {
523            return vec![];
524        };
525        if sender
526            .send(PluginRequest::ListPlugins { response: tx })
527            .is_err()
528        {
529            return vec![];
530        }
531
532        rx.recv().unwrap_or_default()
533    }
534
535    /// Process pending plugin commands (non-blocking)
536    ///
537    /// Returns immediately with any pending commands by polling the command queue directly.
538    /// This does not require the plugin thread to respond, avoiding deadlocks.
539    pub fn process_commands(&mut self) -> Vec<PluginCommand> {
540        let mut commands = Vec::new();
541        while let Ok(cmd) = self.command_receiver.try_recv() {
542            commands.push(cmd);
543        }
544        commands
545    }
546
547    /// Get the state snapshot handle for editor to update
548    pub fn state_snapshot_handle(&self) -> Arc<RwLock<EditorStateSnapshot>> {
549        Arc::clone(&self.state_snapshot)
550    }
551
552    /// Shutdown the plugin thread
553    pub fn shutdown(&mut self) {
554        tracing::debug!("PluginThreadHandle::shutdown: starting shutdown");
555
556        // Drop all pending response senders - this wakes up any plugin code waiting for responses
557        // by causing their oneshot receivers to return an error
558        if let Ok(mut pending) = self.pending_responses.lock() {
559            if !pending.is_empty() {
560                tracing::warn!(
561                    "PluginThreadHandle::shutdown: dropping {} pending responses: {:?}",
562                    pending.len(),
563                    pending.keys().collect::<Vec<_>>()
564                );
565                pending.clear(); // Drop all senders, waking up waiting receivers
566            }
567        }
568
569        // First send a Shutdown request to allow clean processing of pending work
570        if let Some(sender) = self.request_sender.as_ref() {
571            tracing::debug!("PluginThreadHandle::shutdown: sending Shutdown request");
572            let _ = sender.send(PluginRequest::Shutdown);
573        }
574
575        // Then drop the sender to close the channel - this reliably wakes the receiver
576        // even when it's parked in a tokio LocalSet (the Shutdown message above may not wake it)
577        tracing::debug!("PluginThreadHandle::shutdown: dropping request_sender to close channel");
578        self.request_sender.take();
579
580        if let Some(handle) = self.thread_handle.take() {
581            tracing::debug!("PluginThreadHandle::shutdown: joining plugin thread");
582            let _ = handle.join();
583            tracing::debug!("PluginThreadHandle::shutdown: plugin thread joined");
584        }
585
586        tracing::debug!("PluginThreadHandle::shutdown: shutdown complete");
587    }
588
589    /// Resolve an async callback in the plugin runtime
590    /// Called by the app when async operations (SpawnProcess, Delay) complete
591    pub fn resolve_callback(
592        &self,
593        callback_id: fresh_core::api::JsCallbackId,
594        result_json: String,
595    ) {
596        if let Some(sender) = self.request_sender.as_ref() {
597            let _ = sender.send(PluginRequest::ResolveCallback {
598                callback_id,
599                result_json,
600            });
601        }
602    }
603
604    /// Reject an async callback in the plugin runtime
605    /// Called by the app when async operations fail
606    pub fn reject_callback(&self, callback_id: fresh_core::api::JsCallbackId, error: String) {
607        if let Some(sender) = self.request_sender.as_ref() {
608            let _ = sender.send(PluginRequest::RejectCallback { callback_id, error });
609        }
610    }
611}
612
613impl Drop for PluginThreadHandle {
614    fn drop(&mut self) {
615        self.shutdown();
616    }
617}
618
619fn respond_to_pending(
620    pending_responses: &PendingResponses,
621    response: fresh_core::api::PluginResponse,
622) -> bool {
623    let request_id = match &response {
624        fresh_core::api::PluginResponse::VirtualBufferCreated { request_id, .. } => *request_id,
625        fresh_core::api::PluginResponse::LspRequest { request_id, .. } => *request_id,
626        fresh_core::api::PluginResponse::HighlightsComputed { request_id, .. } => *request_id,
627        fresh_core::api::PluginResponse::BufferText { request_id, .. } => *request_id,
628        fresh_core::api::PluginResponse::CompositeBufferCreated { request_id, .. } => *request_id,
629        fresh_core::api::PluginResponse::LineStartPosition { request_id, .. } => *request_id,
630        fresh_core::api::PluginResponse::LineEndPosition { request_id, .. } => *request_id,
631        fresh_core::api::PluginResponse::BufferLineCount { request_id, .. } => *request_id,
632    };
633
634    let sender = {
635        let mut pending = pending_responses.lock().unwrap();
636        pending.remove(&request_id)
637    };
638
639    if let Some(tx) = sender {
640        let _ = tx.send(response);
641        true
642    } else {
643        false
644    }
645}
646
647#[cfg(test)]
648mod plugin_thread_tests {
649    use super::*;
650    use fresh_core::api::PluginResponse;
651    use serde_json::json;
652    use std::collections::HashMap;
653    use std::sync::{Arc, Mutex};
654    use tokio::sync::oneshot;
655
656    #[test]
657    fn respond_to_pending_sends_lsp_response() {
658        let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
659        let (tx, mut rx) = oneshot::channel();
660        pending.lock().unwrap().insert(123, tx);
661
662        respond_to_pending(
663            &pending,
664            PluginResponse::LspRequest {
665                request_id: 123,
666                result: Ok(json!({ "key": "value" })),
667            },
668        );
669
670        let response = rx.try_recv().expect("expected response");
671        match response {
672            PluginResponse::LspRequest { result, .. } => {
673                assert_eq!(result.unwrap(), json!({ "key": "value" }));
674            }
675            _ => panic!("unexpected variant"),
676        }
677
678        assert!(pending.lock().unwrap().is_empty());
679    }
680
681    #[test]
682    fn respond_to_pending_handles_virtual_buffer_created() {
683        let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
684        let (tx, mut rx) = oneshot::channel();
685        pending.lock().unwrap().insert(456, tx);
686
687        respond_to_pending(
688            &pending,
689            PluginResponse::VirtualBufferCreated {
690                request_id: 456,
691                buffer_id: fresh_core::BufferId(7),
692                split_id: Some(fresh_core::SplitId(1)),
693            },
694        );
695
696        let response = rx.try_recv().expect("expected response");
697        match response {
698            PluginResponse::VirtualBufferCreated { buffer_id, .. } => {
699                assert_eq!(buffer_id.0, 7);
700            }
701            _ => panic!("unexpected variant"),
702        }
703
704        assert!(pending.lock().unwrap().is_empty());
705    }
706}
707
708/// Main loop for the plugin thread
709///
710/// Uses `tokio::select!` to interleave request handling with periodic event loop
711/// polling. This allows long-running promises (like process spawns) to make progress
712/// even when no requests are coming in, preventing the UI from getting stuck.
713async fn plugin_thread_loop(
714    runtime: Rc<RefCell<QuickJsBackend>>,
715    plugins: &mut HashMap<String, TsPluginInfo>,
716    mut request_receiver: tokio::sync::mpsc::UnboundedReceiver<PluginRequest>,
717) {
718    tracing::info!("Plugin thread event loop started");
719
720    // Interval for polling the JS event loop when there's pending work
721    let poll_interval = Duration::from_millis(1);
722    let mut has_pending_work = false;
723
724    loop {
725        // Check for fatal JS errors (e.g., unhandled promise rejections in test mode)
726        // These are set via set_fatal_js_error() because panicking inside FFI callbacks
727        // is caught by rquickjs and doesn't terminate the thread.
728        if crate::backend::has_fatal_js_error() {
729            if let Some(error_msg) = crate::backend::take_fatal_js_error() {
730                tracing::error!(
731                    "Fatal JS error detected, terminating plugin thread: {}",
732                    error_msg
733                );
734                panic!("Fatal plugin error: {}", error_msg);
735            }
736        }
737
738        tokio::select! {
739            biased; // Prefer handling requests over polling
740
741            request = request_receiver.recv() => {
742                match request {
743                    Some(PluginRequest::ExecuteAction {
744                        action_name,
745                        response,
746                    }) => {
747                        // Start the action without blocking - this allows us to process
748                        // ResolveCallback requests that the action may be waiting for.
749                        let result = runtime.borrow_mut().start_action(&action_name);
750                        let _ = response.send(result);
751                        has_pending_work = true; // Action may have started async work
752                    }
753                    Some(request) => {
754                        let should_shutdown =
755                            handle_request(request, Rc::clone(&runtime), plugins).await;
756
757                        if should_shutdown {
758                            break;
759                        }
760                        has_pending_work = true; // Request may have started async work
761                    }
762                    None => {
763                        // Channel closed
764                        tracing::info!("Plugin thread request channel closed");
765                        break;
766                    }
767                }
768            }
769
770            // Poll the JS event loop periodically to make progress on pending promises
771            _ = tokio::time::sleep(poll_interval), if has_pending_work => {
772                has_pending_work = runtime.borrow_mut().poll_event_loop_once();
773            }
774        }
775    }
776}
777
778/// Execute an action while processing incoming hook requests concurrently.
779///
780/// This prevents deadlock when an action awaits a response from the main thread
781/// while the main thread is waiting for a blocking hook to complete.
782///
783/// # Safety (clippy::await_holding_refcell_ref)
784/// The RefCell borrow held across await is safe because:
785/// - This runs on a single-threaded tokio runtime (no parallel task execution)
786/// - No spawn_local calls exist that could create concurrent access to `runtime`
787/// - The runtime Rc<RefCell<>> is never shared with other concurrent tasks
788
789/// Run a hook with Rc<RefCell<QuickJsBackend>>
790///
791/// # Safety (clippy::await_holding_refcell_ref)
792/// The RefCell borrow held across await is safe because:
793/// - This runs on a single-threaded tokio runtime (no parallel task execution)
794/// - No spawn_local calls exist that could create concurrent access to `runtime`
795/// - The runtime Rc<RefCell<>> is never shared with other concurrent tasks
796#[allow(clippy::await_holding_refcell_ref)]
797async fn run_hook_internal_rc(
798    runtime: Rc<RefCell<QuickJsBackend>>,
799    hook_name: &str,
800    args: &HookArgs,
801) -> Result<()> {
802    // Convert HookArgs to JSON using hook_args_to_json which produces flat JSON
803    // (not enum-tagged JSON from serde's default Serialize)
804    let json_start = std::time::Instant::now();
805    let json_string = fresh_core::hooks::hook_args_to_json(args)?;
806    let json_data: serde_json::Value = serde_json::from_str(&json_string)?;
807    tracing::trace!(
808        hook = hook_name,
809        json_ms = json_start.elapsed().as_micros(),
810        "hook args serialized"
811    );
812
813    // Emit to TypeScript handlers
814    let emit_start = std::time::Instant::now();
815    runtime.borrow_mut().emit(hook_name, &json_data).await?;
816    tracing::trace!(
817        hook = hook_name,
818        emit_ms = emit_start.elapsed().as_millis(),
819        "emit completed"
820    );
821
822    Ok(())
823}
824
825/// Handle a single request in the plugin thread
826async fn handle_request(
827    request: PluginRequest,
828    runtime: Rc<RefCell<QuickJsBackend>>,
829    plugins: &mut HashMap<String, TsPluginInfo>,
830) -> bool {
831    match request {
832        PluginRequest::LoadPlugin { path, response } => {
833            let result = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await;
834            let _ = response.send(result);
835        }
836
837        PluginRequest::LoadPluginsFromDir { dir, response } => {
838            let errors = load_plugins_from_dir_internal(Rc::clone(&runtime), plugins, &dir).await;
839            let _ = response.send(errors);
840        }
841
842        PluginRequest::LoadPluginsFromDirWithConfig {
843            dir,
844            plugin_configs,
845            response,
846        } => {
847            let (errors, discovered) = load_plugins_from_dir_with_config_internal(
848                Rc::clone(&runtime),
849                plugins,
850                &dir,
851                &plugin_configs,
852            )
853            .await;
854            let _ = response.send((errors, discovered));
855        }
856
857        PluginRequest::UnloadPlugin { name, response } => {
858            let result = unload_plugin_internal(Rc::clone(&runtime), plugins, &name);
859            let _ = response.send(result);
860        }
861
862        PluginRequest::ReloadPlugin { name, response } => {
863            let result = reload_plugin_internal(Rc::clone(&runtime), plugins, &name).await;
864            let _ = response.send(result);
865        }
866
867        PluginRequest::ExecuteAction {
868            action_name,
869            response,
870        } => {
871            // This is handled in plugin_thread_loop with select! for concurrent processing
872            // If we get here, it's an unexpected state
873            tracing::error!(
874                "ExecuteAction should be handled in main loop, not here: {}",
875                action_name
876            );
877            let _ = response.send(Err(anyhow::anyhow!(
878                "Internal error: ExecuteAction in wrong handler"
879            )));
880        }
881
882        PluginRequest::RunHook { hook_name, args } => {
883            // Fire-and-forget hook execution
884            let hook_start = std::time::Instant::now();
885            // Use info level for prompt hooks to aid debugging
886            if hook_name == "prompt_confirmed" || hook_name == "prompt_cancelled" {
887                tracing::info!(hook = %hook_name, ?args, "RunHook request received (prompt hook)");
888            } else {
889                tracing::trace!(hook = %hook_name, "RunHook request received");
890            }
891            if let Err(e) = run_hook_internal_rc(Rc::clone(&runtime), &hook_name, &args).await {
892                let error_msg = format!("Plugin error in '{}': {}", hook_name, e);
893                tracing::error!("{}", error_msg);
894                // Surface the error to the UI
895                runtime.borrow_mut().send_status(error_msg);
896            }
897            if hook_name == "prompt_confirmed" || hook_name == "prompt_cancelled" {
898                tracing::info!(
899                    hook = %hook_name,
900                    elapsed_ms = hook_start.elapsed().as_millis(),
901                    "RunHook completed (prompt hook)"
902                );
903            } else {
904                tracing::trace!(
905                    hook = %hook_name,
906                    elapsed_ms = hook_start.elapsed().as_millis(),
907                    "RunHook completed"
908                );
909            }
910        }
911
912        PluginRequest::HasHookHandlers {
913            hook_name,
914            response,
915        } => {
916            let has_handlers = runtime.borrow().has_handlers(&hook_name);
917            let _ = response.send(has_handlers);
918        }
919
920        PluginRequest::ListPlugins { response } => {
921            let plugin_list: Vec<TsPluginInfo> = plugins.values().cloned().collect();
922            let _ = response.send(plugin_list);
923        }
924
925        PluginRequest::ResolveCallback {
926            callback_id,
927            result_json,
928        } => {
929            tracing::info!(
930                "ResolveCallback: resolving callback_id={} with result_json={}",
931                callback_id,
932                result_json
933            );
934            runtime
935                .borrow_mut()
936                .resolve_callback(callback_id, &result_json);
937            // resolve_callback now runs execute_pending_job() internally
938            tracing::info!(
939                "ResolveCallback: done resolving callback_id={}",
940                callback_id
941            );
942        }
943
944        PluginRequest::RejectCallback { callback_id, error } => {
945            runtime.borrow_mut().reject_callback(callback_id, &error);
946            // reject_callback now runs execute_pending_job() internally
947        }
948
949        PluginRequest::Shutdown => {
950            tracing::info!("Plugin thread received shutdown request");
951            return true;
952        }
953    }
954
955    false
956}
957
958/// Load a plugin from a file
959///
960/// # Safety (clippy::await_holding_refcell_ref)
961/// The RefCell borrow held across await is safe because:
962/// - This runs on a single-threaded tokio runtime (no parallel task execution)
963/// - No spawn_local calls exist that could create concurrent access to `runtime`
964/// - The runtime Rc<RefCell<>> is never shared with other concurrent tasks
965#[allow(clippy::await_holding_refcell_ref)]
966async fn load_plugin_internal(
967    runtime: Rc<RefCell<QuickJsBackend>>,
968    plugins: &mut HashMap<String, TsPluginInfo>,
969    path: &Path,
970) -> Result<()> {
971    let plugin_name = path
972        .file_stem()
973        .and_then(|s| s.to_str())
974        .ok_or_else(|| anyhow!("Invalid plugin filename"))?
975        .to_string();
976
977    tracing::info!("Loading TypeScript plugin: {} from {:?}", plugin_name, path);
978    tracing::debug!(
979        "load_plugin_internal: starting module load for plugin '{}'",
980        plugin_name
981    );
982
983    // Load and execute the module, passing plugin name for command registration
984    let path_str = path
985        .to_str()
986        .ok_or_else(|| anyhow!("Invalid path encoding"))?;
987
988    // Try to load accompanying .i18n.json file
989    let i18n_path = path.with_extension("i18n.json");
990    if i18n_path.exists() {
991        if let Ok(content) = std::fs::read_to_string(&i18n_path) {
992            if let Ok(strings) = serde_json::from_str::<
993                std::collections::HashMap<String, std::collections::HashMap<String, String>>,
994            >(&content)
995            {
996                runtime
997                    .borrow_mut()
998                    .services
999                    .register_plugin_strings(&plugin_name, strings);
1000                tracing::debug!("Loaded i18n strings for plugin '{}'", plugin_name);
1001            }
1002        }
1003    }
1004
1005    let load_start = std::time::Instant::now();
1006    runtime
1007        .borrow_mut()
1008        .load_module_with_source(path_str, &plugin_name)
1009        .await?;
1010    let load_elapsed = load_start.elapsed();
1011
1012    tracing::debug!(
1013        "load_plugin_internal: plugin '{}' loaded successfully in {:?}",
1014        plugin_name,
1015        load_elapsed
1016    );
1017
1018    // Store plugin info
1019    plugins.insert(
1020        plugin_name.clone(),
1021        TsPluginInfo {
1022            name: plugin_name.clone(),
1023            path: path.to_path_buf(),
1024            enabled: true,
1025        },
1026    );
1027
1028    tracing::debug!(
1029        "load_plugin_internal: plugin '{}' registered, total plugins loaded: {}",
1030        plugin_name,
1031        plugins.len()
1032    );
1033
1034    Ok(())
1035}
1036
1037/// Load all plugins from a directory
1038async fn load_plugins_from_dir_internal(
1039    runtime: Rc<RefCell<QuickJsBackend>>,
1040    plugins: &mut HashMap<String, TsPluginInfo>,
1041    dir: &Path,
1042) -> Vec<String> {
1043    tracing::debug!(
1044        "load_plugins_from_dir_internal: scanning directory {:?}",
1045        dir
1046    );
1047    let mut errors = Vec::new();
1048
1049    if !dir.exists() {
1050        tracing::warn!("Plugin directory does not exist: {:?}", dir);
1051        return errors;
1052    }
1053
1054    // Scan directory for .ts and .js files
1055    match std::fs::read_dir(dir) {
1056        Ok(entries) => {
1057            for entry in entries.flatten() {
1058                let path = entry.path();
1059                let ext = path.extension().and_then(|s| s.to_str());
1060                if ext == Some("ts") || ext == Some("js") {
1061                    tracing::debug!(
1062                        "load_plugins_from_dir_internal: attempting to load {:?}",
1063                        path
1064                    );
1065                    if let Err(e) = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await
1066                    {
1067                        let err = format!("Failed to load {:?}: {}", path, e);
1068                        tracing::error!("{}", err);
1069                        errors.push(err);
1070                    }
1071                }
1072            }
1073
1074            tracing::debug!(
1075                "load_plugins_from_dir_internal: finished loading from {:?}, {} errors",
1076                dir,
1077                errors.len()
1078            );
1079        }
1080        Err(e) => {
1081            let err = format!("Failed to read plugin directory: {}", e);
1082            tracing::error!("{}", err);
1083            errors.push(err);
1084        }
1085    }
1086
1087    errors
1088}
1089
1090/// Load all plugins from a directory with config support
1091/// Returns (errors, discovered_plugins) where discovered_plugins contains all
1092/// found plugin files with their configs (respecting enabled state from provided configs)
1093async fn load_plugins_from_dir_with_config_internal(
1094    runtime: Rc<RefCell<QuickJsBackend>>,
1095    plugins: &mut HashMap<String, TsPluginInfo>,
1096    dir: &Path,
1097    plugin_configs: &HashMap<String, PluginConfig>,
1098) -> (Vec<String>, HashMap<String, PluginConfig>) {
1099    tracing::debug!(
1100        "load_plugins_from_dir_with_config_internal: scanning directory {:?}",
1101        dir
1102    );
1103    let mut errors = Vec::new();
1104    let mut discovered_plugins: HashMap<String, PluginConfig> = HashMap::new();
1105
1106    if !dir.exists() {
1107        tracing::warn!("Plugin directory does not exist: {:?}", dir);
1108        return (errors, discovered_plugins);
1109    }
1110
1111    // First pass: scan directory and collect all plugin files
1112    let mut plugin_files: Vec<(String, std::path::PathBuf)> = Vec::new();
1113    match std::fs::read_dir(dir) {
1114        Ok(entries) => {
1115            for entry in entries.flatten() {
1116                let path = entry.path();
1117                let ext = path.extension().and_then(|s| s.to_str());
1118                if ext == Some("ts") || ext == Some("js") {
1119                    // Skip .i18n.json files (they're not plugins)
1120                    if path.to_string_lossy().contains(".i18n.") {
1121                        continue;
1122                    }
1123                    // Get plugin name from filename (without extension)
1124                    let plugin_name = path
1125                        .file_stem()
1126                        .and_then(|s| s.to_str())
1127                        .unwrap_or("unknown")
1128                        .to_string();
1129                    plugin_files.push((plugin_name, path));
1130                }
1131            }
1132        }
1133        Err(e) => {
1134            let err = format!("Failed to read plugin directory: {}", e);
1135            tracing::error!("{}", err);
1136            errors.push(err);
1137            return (errors, discovered_plugins);
1138        }
1139    }
1140
1141    // Second pass: build discovered_plugins map and load enabled plugins
1142    for (plugin_name, path) in plugin_files {
1143        // Check if we have an existing config for this plugin
1144        let config = if let Some(existing_config) = plugin_configs.get(&plugin_name) {
1145            // Use existing config but ensure path is set
1146            PluginConfig {
1147                enabled: existing_config.enabled,
1148                path: Some(path.clone()),
1149            }
1150        } else {
1151            // Create new config with default enabled = true
1152            PluginConfig::new_with_path(path.clone())
1153        };
1154
1155        // Add to discovered plugins
1156        discovered_plugins.insert(plugin_name.clone(), config.clone());
1157
1158        // Only load if enabled
1159        if config.enabled {
1160            tracing::debug!(
1161                "load_plugins_from_dir_with_config_internal: loading enabled plugin '{}'",
1162                plugin_name
1163            );
1164            if let Err(e) = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await {
1165                let err = format!("Failed to load {:?}: {}", path, e);
1166                tracing::error!("{}", err);
1167                errors.push(err);
1168            }
1169        } else {
1170            tracing::info!(
1171                "load_plugins_from_dir_with_config_internal: skipping disabled plugin '{}'",
1172                plugin_name
1173            );
1174        }
1175    }
1176
1177    tracing::debug!(
1178        "load_plugins_from_dir_with_config_internal: finished. Discovered {} plugins, {} errors",
1179        discovered_plugins.len(),
1180        errors.len()
1181    );
1182
1183    (errors, discovered_plugins)
1184}
1185
1186/// Unload a plugin
1187fn unload_plugin_internal(
1188    runtime: Rc<RefCell<QuickJsBackend>>,
1189    plugins: &mut HashMap<String, TsPluginInfo>,
1190    name: &str,
1191) -> Result<()> {
1192    if plugins.remove(name).is_some() {
1193        tracing::info!("Unloading TypeScript plugin: {}", name);
1194
1195        // Unregister i18n strings
1196        runtime
1197            .borrow_mut()
1198            .services
1199            .unregister_plugin_strings(name);
1200
1201        // Remove all commands registered by this plugin
1202        runtime
1203            .borrow()
1204            .services
1205            .unregister_commands_by_plugin(name);
1206
1207        Ok(())
1208    } else {
1209        Err(anyhow!("Plugin '{}' not found", name))
1210    }
1211}
1212
1213/// Reload a plugin
1214async fn reload_plugin_internal(
1215    runtime: Rc<RefCell<QuickJsBackend>>,
1216    plugins: &mut HashMap<String, TsPluginInfo>,
1217    name: &str,
1218) -> Result<()> {
1219    let path = plugins
1220        .get(name)
1221        .ok_or_else(|| anyhow!("Plugin '{}' not found", name))?
1222        .path
1223        .clone();
1224
1225    unload_plugin_internal(Rc::clone(&runtime), plugins, name)?;
1226    load_plugin_internal(runtime, plugins, &path).await?;
1227
1228    Ok(())
1229}
1230
1231#[cfg(test)]
1232mod tests {
1233    use super::*;
1234    use fresh_core::hooks::hook_args_to_json;
1235
1236    #[test]
1237    fn test_oneshot_channel() {
1238        let (tx, rx) = oneshot::channel::<i32>();
1239        assert!(tx.send(42).is_ok());
1240        assert_eq!(rx.recv().unwrap(), 42);
1241    }
1242
1243    #[test]
1244    fn test_hook_args_to_json_editor_initialized() {
1245        let args = HookArgs::EditorInitialized;
1246        let json = hook_args_to_json(&args).unwrap();
1247        assert_eq!(json, "{}");
1248    }
1249
1250    #[test]
1251    fn test_hook_args_to_json_prompt_changed() {
1252        let args = HookArgs::PromptChanged {
1253            prompt_type: "search".to_string(),
1254            input: "test".to_string(),
1255        };
1256        let json = hook_args_to_json(&args).unwrap();
1257        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1258        assert_eq!(parsed["prompt_type"], "search");
1259        assert_eq!(parsed["input"], "test");
1260    }
1261}