Skip to main content

fresh_plugin_runtime/
thread.rs

1//! Plugin Thread: Dedicated thread for TypeScript plugin execution
2//!
3//! This module implements a dedicated thread architecture for plugin execution,
4//! using QuickJS as the JavaScript runtime with oxc for TypeScript transpilation.
5//!
6//! Architecture:
7//! - Main thread (UI) sends requests to plugin thread via channel
8//! - Plugin thread owns QuickJS runtime and persistent tokio runtime
9//! - Results are sent back via the existing PluginCommand channel
10//! - Async operations complete naturally without runtime destruction
11
12use crate::backend::quickjs_backend::{PendingResponses, TsPluginInfo};
13use crate::backend::QuickJsBackend;
14use anyhow::{anyhow, Result};
15use fresh_core::api::{EditorStateSnapshot, PluginCommand};
16use fresh_core::hooks::HookArgs;
17use std::cell::RefCell;
18use std::collections::HashMap;
19use std::path::{Path, PathBuf};
20use std::rc::Rc;
21use std::sync::{Arc, RwLock};
22use std::thread::{self, JoinHandle};
23use std::time::Duration;
24
25// Re-export PluginConfig from fresh-core
26pub use fresh_core::config::PluginConfig;
27
28/// Request messages sent to the plugin thread
29#[derive(Debug)]
30pub enum PluginRequest {
31    /// Load a plugin from a file
32    LoadPlugin {
33        path: PathBuf,
34        response: oneshot::Sender<Result<()>>,
35    },
36
37    /// Resolve an async callback with a result (for async operations like SpawnProcess, Delay)
38    ResolveCallback {
39        callback_id: fresh_core::api::JsCallbackId,
40        result_json: String,
41    },
42
43    /// Reject an async callback with an error
44    RejectCallback {
45        callback_id: fresh_core::api::JsCallbackId,
46        error: String,
47    },
48
49    /// Load all plugins from a directory
50    LoadPluginsFromDir {
51        dir: PathBuf,
52        response: oneshot::Sender<Vec<String>>,
53    },
54
55    /// Load all plugins from a directory with config support
56    /// Returns (errors, discovered_plugins) where discovered_plugins contains
57    /// all found plugins with their paths and enabled status
58    LoadPluginsFromDirWithConfig {
59        dir: PathBuf,
60        plugin_configs: HashMap<String, PluginConfig>,
61        response: oneshot::Sender<(Vec<String>, HashMap<String, PluginConfig>)>,
62    },
63
64    /// Unload a plugin by name
65    UnloadPlugin {
66        name: String,
67        response: oneshot::Sender<Result<()>>,
68    },
69
70    /// Reload a plugin by name
71    ReloadPlugin {
72        name: String,
73        response: oneshot::Sender<Result<()>>,
74    },
75
76    /// Execute a plugin action
77    ExecuteAction {
78        action_name: String,
79        response: oneshot::Sender<Result<()>>,
80    },
81
82    /// Run a hook (fire-and-forget, no response needed)
83    RunHook { hook_name: String, args: HookArgs },
84
85    /// Check if any handlers are registered for a hook
86    HasHookHandlers {
87        hook_name: String,
88        response: oneshot::Sender<bool>,
89    },
90
91    /// List all loaded plugins
92    ListPlugins {
93        response: oneshot::Sender<Vec<TsPluginInfo>>,
94    },
95
96    /// Shutdown the plugin thread
97    Shutdown,
98}
99
100/// Simple oneshot channel implementation
101pub mod oneshot {
102    use std::fmt;
103    use std::sync::mpsc;
104
105    pub struct Sender<T>(mpsc::SyncSender<T>);
106    pub struct Receiver<T>(mpsc::Receiver<T>);
107
108    use anyhow::Result;
109
110    impl<T> fmt::Debug for Sender<T> {
111        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112            f.debug_tuple("Sender").finish()
113        }
114    }
115
116    impl<T> fmt::Debug for Receiver<T> {
117        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118            f.debug_tuple("Receiver").finish()
119        }
120    }
121
122    impl<T> Sender<T> {
123        pub fn send(self, value: T) -> Result<(), T> {
124            self.0.send(value).map_err(|e| e.0)
125        }
126    }
127
128    impl<T> Receiver<T> {
129        pub fn recv(self) -> Result<T, mpsc::RecvError> {
130            self.0.recv()
131        }
132
133        pub fn recv_timeout(
134            self,
135            timeout: std::time::Duration,
136        ) -> Result<T, mpsc::RecvTimeoutError> {
137            self.0.recv_timeout(timeout)
138        }
139
140        pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
141            self.0.try_recv()
142        }
143    }
144
145    pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
146        let (tx, rx) = mpsc::sync_channel(1);
147        (Sender(tx), Receiver(rx))
148    }
149}
150
151/// Handle to the plugin thread for sending requests
152pub struct PluginThreadHandle {
153    /// Channel to send requests to the plugin thread
154    /// Wrapped in Option so we can drop it to signal shutdown
155    request_sender: Option<tokio::sync::mpsc::UnboundedSender<PluginRequest>>,
156
157    /// Thread join handle
158    thread_handle: Option<JoinHandle<()>>,
159
160    /// State snapshot handle for editor to update
161    state_snapshot: Arc<RwLock<EditorStateSnapshot>>,
162
163    /// Pending response senders for async operations (shared with runtime)
164    pending_responses: PendingResponses,
165
166    /// Receiver for plugin commands (polled by editor directly)
167    command_receiver: std::sync::mpsc::Receiver<PluginCommand>,
168}
169
170impl PluginThreadHandle {
171    /// Create a new plugin thread and return its handle
172    pub fn spawn(services: Arc<dyn fresh_core::services::PluginServiceBridge>) -> Result<Self> {
173        tracing::debug!("PluginThreadHandle::spawn: starting plugin thread creation");
174
175        // Create channel for plugin commands
176        let (command_sender, command_receiver) = std::sync::mpsc::channel();
177
178        // Create editor state snapshot for query API
179        let state_snapshot = Arc::new(RwLock::new(EditorStateSnapshot::new()));
180
181        // Create pending responses map (shared between handle and runtime)
182        let pending_responses: PendingResponses =
183            Arc::new(std::sync::Mutex::new(std::collections::HashMap::new()));
184        let thread_pending_responses = Arc::clone(&pending_responses);
185
186        // Create channel for requests (unbounded allows sync send, async recv)
187        let (request_sender, request_receiver) = tokio::sync::mpsc::unbounded_channel();
188
189        // Clone state snapshot for the thread
190        let thread_state_snapshot = Arc::clone(&state_snapshot);
191
192        // Spawn the plugin thread
193        tracing::debug!("PluginThreadHandle::spawn: spawning OS thread for plugin runtime");
194        let thread_handle = thread::spawn(move || {
195            tracing::debug!("Plugin thread: OS thread started, creating tokio runtime");
196            // Create tokio runtime for the plugin thread
197            let rt = match tokio::runtime::Builder::new_current_thread()
198                .enable_all()
199                .build()
200            {
201                Ok(rt) => {
202                    tracing::debug!("Plugin thread: tokio runtime created successfully");
203                    rt
204                }
205                Err(e) => {
206                    tracing::error!("Failed to create plugin thread runtime: {}", e);
207                    return;
208                }
209            };
210
211            // Create QuickJS runtime with state
212            tracing::debug!("Plugin thread: creating QuickJS runtime");
213            let runtime = match QuickJsBackend::with_state_and_responses(
214                Arc::clone(&thread_state_snapshot),
215                command_sender,
216                thread_pending_responses,
217                services.clone(),
218            ) {
219                Ok(rt) => {
220                    tracing::debug!("Plugin thread: QuickJS runtime created successfully");
221                    rt
222                }
223                Err(e) => {
224                    tracing::error!("Failed to create QuickJS runtime: {}", e);
225                    return;
226                }
227            };
228
229            // Create internal manager state
230            let mut plugins: HashMap<String, TsPluginInfo> = HashMap::new();
231
232            // Run the event loop with a LocalSet to allow concurrent task execution
233            tracing::debug!("Plugin thread: starting event loop with LocalSet");
234            let local = tokio::task::LocalSet::new();
235            local.block_on(&rt, async {
236                // Wrap runtime in RefCell for interior mutability during concurrent operations
237                let runtime = Rc::new(RefCell::new(runtime));
238                tracing::debug!("Plugin thread: entering plugin_thread_loop");
239                plugin_thread_loop(runtime, &mut plugins, request_receiver).await;
240            });
241
242            tracing::info!("Plugin thread shutting down");
243        });
244
245        tracing::debug!("PluginThreadHandle::spawn: OS thread spawned, returning handle");
246        tracing::info!("Plugin thread spawned");
247
248        Ok(Self {
249            request_sender: Some(request_sender),
250            thread_handle: Some(thread_handle),
251            state_snapshot,
252            pending_responses,
253            command_receiver,
254        })
255    }
256
257    /// Check if the plugin thread is still alive
258    pub fn is_alive(&self) -> bool {
259        self.thread_handle
260            .as_ref()
261            .map(|h| !h.is_finished())
262            .unwrap_or(false)
263    }
264
265    /// Check thread health and panic if the plugin thread died due to a panic.
266    /// This propagates plugin thread panics to the calling thread.
267    /// Call this periodically to detect plugin thread failures.
268    pub fn check_thread_health(&mut self) {
269        if let Some(handle) = &self.thread_handle {
270            if handle.is_finished() {
271                tracing::error!(
272                    "check_thread_health: plugin thread is finished, checking for panic"
273                );
274                // Thread finished - take ownership and check result
275                if let Some(handle) = self.thread_handle.take() {
276                    match handle.join() {
277                        Ok(()) => {
278                            tracing::warn!("Plugin thread exited normally (unexpected)");
279                        }
280                        Err(panic_payload) => {
281                            // Re-panic with the original panic message to propagate it
282                            std::panic::resume_unwind(panic_payload);
283                        }
284                    }
285                }
286            }
287        }
288    }
289
290    /// Deliver a response to a pending async operation in the plugin
291    ///
292    /// This is called by the editor after processing a command that requires a response.
293    pub fn deliver_response(&self, response: fresh_core::api::PluginResponse) {
294        // First try to find a pending Rust request (oneshot channel)
295        if respond_to_pending(&self.pending_responses, response.clone()) {
296            return;
297        }
298
299        // If not found, it must be a JS callback
300        use fresh_core::api::{JsCallbackId, PluginResponse};
301
302        match response {
303            PluginResponse::VirtualBufferCreated {
304                request_id,
305                buffer_id,
306                split_id,
307            } => {
308                // Return an object with bufferId and splitId (camelCase for JS)
309                let result = serde_json::json!({
310                    "bufferId": buffer_id.0,
311                    "splitId": split_id.map(|s| s.0)
312                });
313                self.resolve_callback(JsCallbackId(request_id), result.to_string());
314            }
315            PluginResponse::LspRequest { request_id, result } => match result {
316                Ok(value) => {
317                    self.resolve_callback(JsCallbackId(request_id), value.to_string());
318                }
319                Err(e) => {
320                    self.reject_callback(JsCallbackId(request_id), e);
321                }
322            },
323            PluginResponse::HighlightsComputed { request_id, spans } => {
324                let result = serde_json::to_string(&spans).unwrap_or_else(|_| "[]".to_string());
325                self.resolve_callback(JsCallbackId(request_id), result);
326            }
327            PluginResponse::BufferText { request_id, text } => match text {
328                Ok(content) => {
329                    // JSON stringify the content string
330                    let result =
331                        serde_json::to_string(&content).unwrap_or_else(|_| "\"\"".to_string());
332                    self.resolve_callback(JsCallbackId(request_id), result);
333                }
334                Err(e) => {
335                    self.reject_callback(JsCallbackId(request_id), e);
336                }
337            },
338            PluginResponse::CompositeBufferCreated {
339                request_id,
340                buffer_id,
341            } => {
342                // Return just the buffer_id number, not an object
343                self.resolve_callback(JsCallbackId(request_id), buffer_id.0.to_string());
344            }
345            PluginResponse::LineStartPosition {
346                request_id,
347                position,
348            } => {
349                // Return the position as a number or null
350                let result =
351                    serde_json::to_string(&position).unwrap_or_else(|_| "null".to_string());
352                self.resolve_callback(JsCallbackId(request_id), result);
353            }
354        }
355    }
356
357    /// Load a plugin from a file (blocking)
358    pub fn load_plugin(&self, path: &Path) -> Result<()> {
359        let (tx, rx) = oneshot::channel();
360        self.request_sender
361            .as_ref()
362            .ok_or_else(|| anyhow!("Plugin thread shut down"))?
363            .send(PluginRequest::LoadPlugin {
364                path: path.to_path_buf(),
365                response: tx,
366            })
367            .map_err(|_| anyhow!("Plugin thread not responding"))?;
368
369        rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
370    }
371
372    /// Load all plugins from a directory (blocking)
373    pub fn load_plugins_from_dir(&self, dir: &Path) -> Vec<String> {
374        let (tx, rx) = oneshot::channel();
375        let Some(sender) = self.request_sender.as_ref() else {
376            return vec!["Plugin thread shut down".to_string()];
377        };
378        if sender
379            .send(PluginRequest::LoadPluginsFromDir {
380                dir: dir.to_path_buf(),
381                response: tx,
382            })
383            .is_err()
384        {
385            return vec!["Plugin thread not responding".to_string()];
386        }
387
388        rx.recv()
389            .unwrap_or_else(|_| vec!["Plugin thread closed".to_string()])
390    }
391
392    /// Load all plugins from a directory with config support (blocking)
393    /// Returns (errors, discovered_plugins) where discovered_plugins is a map of
394    /// plugin name -> PluginConfig with paths populated.
395    pub fn load_plugins_from_dir_with_config(
396        &self,
397        dir: &Path,
398        plugin_configs: &HashMap<String, PluginConfig>,
399    ) -> (Vec<String>, HashMap<String, PluginConfig>) {
400        let (tx, rx) = oneshot::channel();
401        let Some(sender) = self.request_sender.as_ref() else {
402            return (vec!["Plugin thread shut down".to_string()], HashMap::new());
403        };
404        if sender
405            .send(PluginRequest::LoadPluginsFromDirWithConfig {
406                dir: dir.to_path_buf(),
407                plugin_configs: plugin_configs.clone(),
408                response: tx,
409            })
410            .is_err()
411        {
412            return (
413                vec!["Plugin thread not responding".to_string()],
414                HashMap::new(),
415            );
416        }
417
418        rx.recv()
419            .unwrap_or_else(|_| (vec!["Plugin thread closed".to_string()], HashMap::new()))
420    }
421
422    /// Unload a plugin (blocking)
423    pub fn unload_plugin(&self, name: &str) -> Result<()> {
424        let (tx, rx) = oneshot::channel();
425        self.request_sender
426            .as_ref()
427            .ok_or_else(|| anyhow!("Plugin thread shut down"))?
428            .send(PluginRequest::UnloadPlugin {
429                name: name.to_string(),
430                response: tx,
431            })
432            .map_err(|_| anyhow!("Plugin thread not responding"))?;
433
434        rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
435    }
436
437    /// Reload a plugin (blocking)
438    pub fn reload_plugin(&self, name: &str) -> Result<()> {
439        let (tx, rx) = oneshot::channel();
440        self.request_sender
441            .as_ref()
442            .ok_or_else(|| anyhow!("Plugin thread shut down"))?
443            .send(PluginRequest::ReloadPlugin {
444                name: name.to_string(),
445                response: tx,
446            })
447            .map_err(|_| anyhow!("Plugin thread not responding"))?;
448
449        rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
450    }
451
452    /// Execute a plugin action (non-blocking)
453    ///
454    /// Returns a receiver that will receive the result when the action completes.
455    /// The caller should poll this while processing commands to avoid deadlock.
456    pub fn execute_action_async(&self, action_name: &str) -> Result<oneshot::Receiver<Result<()>>> {
457        tracing::trace!("execute_action_async: starting action '{}'", action_name);
458        let (tx, rx) = oneshot::channel();
459        self.request_sender
460            .as_ref()
461            .ok_or_else(|| anyhow!("Plugin thread shut down"))?
462            .send(PluginRequest::ExecuteAction {
463                action_name: action_name.to_string(),
464                response: tx,
465            })
466            .map_err(|_| anyhow!("Plugin thread not responding"))?;
467
468        tracing::trace!("execute_action_async: request sent for '{}'", action_name);
469        Ok(rx)
470    }
471
472    /// Run a hook (non-blocking, fire-and-forget)
473    ///
474    /// This is the key improvement: hooks are now non-blocking.
475    /// The plugin thread will execute them asynchronously and
476    /// any results will come back via the PluginCommand channel.
477    pub fn run_hook(&self, hook_name: &str, args: HookArgs) {
478        if let Some(sender) = self.request_sender.as_ref() {
479            let _ = sender.send(PluginRequest::RunHook {
480                hook_name: hook_name.to_string(),
481                args,
482            });
483        }
484    }
485
486    /// Check if any handlers are registered for a hook (blocking)
487    pub fn has_hook_handlers(&self, hook_name: &str) -> bool {
488        let (tx, rx) = oneshot::channel();
489        let Some(sender) = self.request_sender.as_ref() else {
490            return false;
491        };
492        if sender
493            .send(PluginRequest::HasHookHandlers {
494                hook_name: hook_name.to_string(),
495                response: tx,
496            })
497            .is_err()
498        {
499            return false;
500        }
501
502        rx.recv().unwrap_or(false)
503    }
504
505    /// List all loaded plugins (blocking)
506    pub fn list_plugins(&self) -> Vec<TsPluginInfo> {
507        let (tx, rx) = oneshot::channel();
508        let Some(sender) = self.request_sender.as_ref() else {
509            return vec![];
510        };
511        if sender
512            .send(PluginRequest::ListPlugins { response: tx })
513            .is_err()
514        {
515            return vec![];
516        }
517
518        rx.recv().unwrap_or_default()
519    }
520
521    /// Process pending plugin commands (non-blocking)
522    ///
523    /// Returns immediately with any pending commands by polling the command queue directly.
524    /// This does not require the plugin thread to respond, avoiding deadlocks.
525    pub fn process_commands(&mut self) -> Vec<PluginCommand> {
526        let mut commands = Vec::new();
527        while let Ok(cmd) = self.command_receiver.try_recv() {
528            commands.push(cmd);
529        }
530        commands
531    }
532
533    /// Get the state snapshot handle for editor to update
534    pub fn state_snapshot_handle(&self) -> Arc<RwLock<EditorStateSnapshot>> {
535        Arc::clone(&self.state_snapshot)
536    }
537
538    /// Shutdown the plugin thread
539    pub fn shutdown(&mut self) {
540        tracing::debug!("PluginThreadHandle::shutdown: starting shutdown");
541
542        // Drop all pending response senders - this wakes up any plugin code waiting for responses
543        // by causing their oneshot receivers to return an error
544        if let Ok(mut pending) = self.pending_responses.lock() {
545            if !pending.is_empty() {
546                tracing::warn!(
547                    "PluginThreadHandle::shutdown: dropping {} pending responses: {:?}",
548                    pending.len(),
549                    pending.keys().collect::<Vec<_>>()
550                );
551                pending.clear(); // Drop all senders, waking up waiting receivers
552            }
553        }
554
555        // First send a Shutdown request to allow clean processing of pending work
556        if let Some(sender) = self.request_sender.as_ref() {
557            tracing::debug!("PluginThreadHandle::shutdown: sending Shutdown request");
558            let _ = sender.send(PluginRequest::Shutdown);
559        }
560
561        // Then drop the sender to close the channel - this reliably wakes the receiver
562        // even when it's parked in a tokio LocalSet (the Shutdown message above may not wake it)
563        tracing::debug!("PluginThreadHandle::shutdown: dropping request_sender to close channel");
564        self.request_sender.take();
565
566        if let Some(handle) = self.thread_handle.take() {
567            tracing::debug!("PluginThreadHandle::shutdown: joining plugin thread");
568            let _ = handle.join();
569            tracing::debug!("PluginThreadHandle::shutdown: plugin thread joined");
570        }
571
572        tracing::debug!("PluginThreadHandle::shutdown: shutdown complete");
573    }
574
575    /// Resolve an async callback in the plugin runtime
576    /// Called by the app when async operations (SpawnProcess, Delay) complete
577    pub fn resolve_callback(
578        &self,
579        callback_id: fresh_core::api::JsCallbackId,
580        result_json: String,
581    ) {
582        if let Some(sender) = self.request_sender.as_ref() {
583            let _ = sender.send(PluginRequest::ResolveCallback {
584                callback_id,
585                result_json,
586            });
587        }
588    }
589
590    /// Reject an async callback in the plugin runtime
591    /// Called by the app when async operations fail
592    pub fn reject_callback(&self, callback_id: fresh_core::api::JsCallbackId, error: String) {
593        if let Some(sender) = self.request_sender.as_ref() {
594            let _ = sender.send(PluginRequest::RejectCallback { callback_id, error });
595        }
596    }
597}
598
599impl Drop for PluginThreadHandle {
600    fn drop(&mut self) {
601        self.shutdown();
602    }
603}
604
605fn respond_to_pending(
606    pending_responses: &PendingResponses,
607    response: fresh_core::api::PluginResponse,
608) -> bool {
609    let request_id = match &response {
610        fresh_core::api::PluginResponse::VirtualBufferCreated { request_id, .. } => *request_id,
611        fresh_core::api::PluginResponse::LspRequest { request_id, .. } => *request_id,
612        fresh_core::api::PluginResponse::HighlightsComputed { request_id, .. } => *request_id,
613        fresh_core::api::PluginResponse::BufferText { request_id, .. } => *request_id,
614        fresh_core::api::PluginResponse::CompositeBufferCreated { request_id, .. } => *request_id,
615        fresh_core::api::PluginResponse::LineStartPosition { request_id, .. } => *request_id,
616    };
617
618    let sender = {
619        let mut pending = pending_responses.lock().unwrap();
620        pending.remove(&request_id)
621    };
622
623    if let Some(tx) = sender {
624        let _ = tx.send(response);
625        true
626    } else {
627        false
628    }
629}
630
631#[cfg(test)]
632mod plugin_thread_tests {
633    use super::*;
634    use fresh_core::api::PluginResponse;
635    use serde_json::json;
636    use std::collections::HashMap;
637    use std::sync::{Arc, Mutex};
638    use tokio::sync::oneshot;
639
640    #[test]
641    fn respond_to_pending_sends_lsp_response() {
642        let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
643        let (tx, mut rx) = oneshot::channel();
644        pending.lock().unwrap().insert(123, tx);
645
646        respond_to_pending(
647            &pending,
648            PluginResponse::LspRequest {
649                request_id: 123,
650                result: Ok(json!({ "key": "value" })),
651            },
652        );
653
654        let response = rx.try_recv().expect("expected response");
655        match response {
656            PluginResponse::LspRequest { result, .. } => {
657                assert_eq!(result.unwrap(), json!({ "key": "value" }));
658            }
659            _ => panic!("unexpected variant"),
660        }
661
662        assert!(pending.lock().unwrap().is_empty());
663    }
664
665    #[test]
666    fn respond_to_pending_handles_virtual_buffer_created() {
667        let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
668        let (tx, mut rx) = oneshot::channel();
669        pending.lock().unwrap().insert(456, tx);
670
671        respond_to_pending(
672            &pending,
673            PluginResponse::VirtualBufferCreated {
674                request_id: 456,
675                buffer_id: fresh_core::BufferId(7),
676                split_id: Some(fresh_core::SplitId(1)),
677            },
678        );
679
680        let response = rx.try_recv().expect("expected response");
681        match response {
682            PluginResponse::VirtualBufferCreated { buffer_id, .. } => {
683                assert_eq!(buffer_id.0, 7);
684            }
685            _ => panic!("unexpected variant"),
686        }
687
688        assert!(pending.lock().unwrap().is_empty());
689    }
690}
691
692/// Main loop for the plugin thread
693///
694/// Uses `tokio::select!` to interleave request handling with periodic event loop
695/// polling. This allows long-running promises (like process spawns) to make progress
696/// even when no requests are coming in, preventing the UI from getting stuck.
697async fn plugin_thread_loop(
698    runtime: Rc<RefCell<QuickJsBackend>>,
699    plugins: &mut HashMap<String, TsPluginInfo>,
700    mut request_receiver: tokio::sync::mpsc::UnboundedReceiver<PluginRequest>,
701) {
702    tracing::info!("Plugin thread event loop started");
703
704    // Interval for polling the JS event loop when there's pending work
705    let poll_interval = Duration::from_millis(1);
706    let mut has_pending_work = false;
707
708    loop {
709        // Check for fatal JS errors (e.g., unhandled promise rejections in test mode)
710        // These are set via set_fatal_js_error() because panicking inside FFI callbacks
711        // is caught by rquickjs and doesn't terminate the thread.
712        if crate::backend::has_fatal_js_error() {
713            if let Some(error_msg) = crate::backend::take_fatal_js_error() {
714                tracing::error!(
715                    "Fatal JS error detected, terminating plugin thread: {}",
716                    error_msg
717                );
718                panic!("Fatal plugin error: {}", error_msg);
719            }
720        }
721
722        tokio::select! {
723            biased; // Prefer handling requests over polling
724
725            request = request_receiver.recv() => {
726                match request {
727                    Some(PluginRequest::ExecuteAction {
728                        action_name,
729                        response,
730                    }) => {
731                        // Start the action without blocking - this allows us to process
732                        // ResolveCallback requests that the action may be waiting for.
733                        let result = runtime.borrow_mut().start_action(&action_name);
734                        let _ = response.send(result);
735                        has_pending_work = true; // Action may have started async work
736                    }
737                    Some(request) => {
738                        let should_shutdown =
739                            handle_request(request, Rc::clone(&runtime), plugins).await;
740
741                        if should_shutdown {
742                            break;
743                        }
744                        has_pending_work = true; // Request may have started async work
745                    }
746                    None => {
747                        // Channel closed
748                        tracing::info!("Plugin thread request channel closed");
749                        break;
750                    }
751                }
752            }
753
754            // Poll the JS event loop periodically to make progress on pending promises
755            _ = tokio::time::sleep(poll_interval), if has_pending_work => {
756                has_pending_work = runtime.borrow_mut().poll_event_loop_once();
757            }
758        }
759    }
760}
761
762/// Execute an action while processing incoming hook requests concurrently.
763///
764/// This prevents deadlock when an action awaits a response from the main thread
765/// while the main thread is waiting for a blocking hook to complete.
766///
767/// # Safety (clippy::await_holding_refcell_ref)
768/// The RefCell borrow held across await is safe because:
769/// - This runs on a single-threaded tokio runtime (no parallel task execution)
770/// - No spawn_local calls exist that could create concurrent access to `runtime`
771/// - The runtime Rc<RefCell<>> is never shared with other concurrent tasks
772
773/// Run a hook with Rc<RefCell<QuickJsBackend>>
774///
775/// # Safety (clippy::await_holding_refcell_ref)
776/// The RefCell borrow held across await is safe because:
777/// - This runs on a single-threaded tokio runtime (no parallel task execution)
778/// - No spawn_local calls exist that could create concurrent access to `runtime`
779/// - The runtime Rc<RefCell<>> is never shared with other concurrent tasks
780#[allow(clippy::await_holding_refcell_ref)]
781async fn run_hook_internal_rc(
782    runtime: Rc<RefCell<QuickJsBackend>>,
783    hook_name: &str,
784    args: &HookArgs,
785) -> Result<()> {
786    // Convert HookArgs to JSON using hook_args_to_json which produces flat JSON
787    // (not enum-tagged JSON from serde's default Serialize)
788    let json_start = std::time::Instant::now();
789    let json_string = fresh_core::hooks::hook_args_to_json(args)?;
790    let json_data: serde_json::Value = serde_json::from_str(&json_string)?;
791    tracing::trace!(
792        hook = hook_name,
793        json_ms = json_start.elapsed().as_micros(),
794        "hook args serialized"
795    );
796
797    // Emit to TypeScript handlers
798    let emit_start = std::time::Instant::now();
799    runtime.borrow_mut().emit(hook_name, &json_data).await?;
800    tracing::trace!(
801        hook = hook_name,
802        emit_ms = emit_start.elapsed().as_millis(),
803        "emit completed"
804    );
805
806    Ok(())
807}
808
809/// Handle a single request in the plugin thread
810async fn handle_request(
811    request: PluginRequest,
812    runtime: Rc<RefCell<QuickJsBackend>>,
813    plugins: &mut HashMap<String, TsPluginInfo>,
814) -> bool {
815    match request {
816        PluginRequest::LoadPlugin { path, response } => {
817            let result = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await;
818            let _ = response.send(result);
819        }
820
821        PluginRequest::LoadPluginsFromDir { dir, response } => {
822            let errors = load_plugins_from_dir_internal(Rc::clone(&runtime), plugins, &dir).await;
823            let _ = response.send(errors);
824        }
825
826        PluginRequest::LoadPluginsFromDirWithConfig {
827            dir,
828            plugin_configs,
829            response,
830        } => {
831            let (errors, discovered) = load_plugins_from_dir_with_config_internal(
832                Rc::clone(&runtime),
833                plugins,
834                &dir,
835                &plugin_configs,
836            )
837            .await;
838            let _ = response.send((errors, discovered));
839        }
840
841        PluginRequest::UnloadPlugin { name, response } => {
842            let result = unload_plugin_internal(Rc::clone(&runtime), plugins, &name);
843            let _ = response.send(result);
844        }
845
846        PluginRequest::ReloadPlugin { name, response } => {
847            let result = reload_plugin_internal(Rc::clone(&runtime), plugins, &name).await;
848            let _ = response.send(result);
849        }
850
851        PluginRequest::ExecuteAction {
852            action_name,
853            response,
854        } => {
855            // This is handled in plugin_thread_loop with select! for concurrent processing
856            // If we get here, it's an unexpected state
857            tracing::error!(
858                "ExecuteAction should be handled in main loop, not here: {}",
859                action_name
860            );
861            let _ = response.send(Err(anyhow::anyhow!(
862                "Internal error: ExecuteAction in wrong handler"
863            )));
864        }
865
866        PluginRequest::RunHook { hook_name, args } => {
867            // Fire-and-forget hook execution
868            let hook_start = std::time::Instant::now();
869            // Use info level for prompt hooks to aid debugging
870            if hook_name == "prompt_confirmed" || hook_name == "prompt_cancelled" {
871                tracing::info!(hook = %hook_name, ?args, "RunHook request received (prompt hook)");
872            } else {
873                tracing::trace!(hook = %hook_name, "RunHook request received");
874            }
875            if let Err(e) = run_hook_internal_rc(Rc::clone(&runtime), &hook_name, &args).await {
876                let error_msg = format!("Plugin error in '{}': {}", hook_name, e);
877                tracing::error!("{}", error_msg);
878                // Surface the error to the UI
879                runtime.borrow_mut().send_status(error_msg);
880            }
881            if hook_name == "prompt_confirmed" || hook_name == "prompt_cancelled" {
882                tracing::info!(
883                    hook = %hook_name,
884                    elapsed_ms = hook_start.elapsed().as_millis(),
885                    "RunHook completed (prompt hook)"
886                );
887            } else {
888                tracing::trace!(
889                    hook = %hook_name,
890                    elapsed_ms = hook_start.elapsed().as_millis(),
891                    "RunHook completed"
892                );
893            }
894        }
895
896        PluginRequest::HasHookHandlers {
897            hook_name,
898            response,
899        } => {
900            let has_handlers = runtime.borrow().has_handlers(&hook_name);
901            let _ = response.send(has_handlers);
902        }
903
904        PluginRequest::ListPlugins { response } => {
905            let plugin_list: Vec<TsPluginInfo> = plugins.values().cloned().collect();
906            let _ = response.send(plugin_list);
907        }
908
909        PluginRequest::ResolveCallback {
910            callback_id,
911            result_json,
912        } => {
913            tracing::info!(
914                "ResolveCallback: resolving callback_id={} with result_json={}",
915                callback_id,
916                result_json
917            );
918            runtime
919                .borrow_mut()
920                .resolve_callback(callback_id, &result_json);
921            // resolve_callback now runs execute_pending_job() internally
922            tracing::info!(
923                "ResolveCallback: done resolving callback_id={}",
924                callback_id
925            );
926        }
927
928        PluginRequest::RejectCallback { callback_id, error } => {
929            runtime.borrow_mut().reject_callback(callback_id, &error);
930            // reject_callback now runs execute_pending_job() internally
931        }
932
933        PluginRequest::Shutdown => {
934            tracing::info!("Plugin thread received shutdown request");
935            return true;
936        }
937    }
938
939    false
940}
941
942/// Load a plugin from a file
943///
944/// # Safety (clippy::await_holding_refcell_ref)
945/// The RefCell borrow held across await is safe because:
946/// - This runs on a single-threaded tokio runtime (no parallel task execution)
947/// - No spawn_local calls exist that could create concurrent access to `runtime`
948/// - The runtime Rc<RefCell<>> is never shared with other concurrent tasks
949#[allow(clippy::await_holding_refcell_ref)]
950async fn load_plugin_internal(
951    runtime: Rc<RefCell<QuickJsBackend>>,
952    plugins: &mut HashMap<String, TsPluginInfo>,
953    path: &Path,
954) -> Result<()> {
955    let plugin_name = path
956        .file_stem()
957        .and_then(|s| s.to_str())
958        .ok_or_else(|| anyhow!("Invalid plugin filename"))?
959        .to_string();
960
961    tracing::info!("Loading TypeScript plugin: {} from {:?}", plugin_name, path);
962    tracing::debug!(
963        "load_plugin_internal: starting module load for plugin '{}'",
964        plugin_name
965    );
966
967    // Load and execute the module, passing plugin name for command registration
968    let path_str = path
969        .to_str()
970        .ok_or_else(|| anyhow!("Invalid path encoding"))?;
971
972    // Try to load accompanying .i18n.json file
973    let i18n_path = path.with_extension("i18n.json");
974    if i18n_path.exists() {
975        if let Ok(content) = std::fs::read_to_string(&i18n_path) {
976            if let Ok(strings) = serde_json::from_str::<
977                std::collections::HashMap<String, std::collections::HashMap<String, String>>,
978            >(&content)
979            {
980                runtime
981                    .borrow_mut()
982                    .services
983                    .register_plugin_strings(&plugin_name, strings);
984                tracing::debug!("Loaded i18n strings for plugin '{}'", plugin_name);
985            }
986        }
987    }
988
989    let load_start = std::time::Instant::now();
990    runtime
991        .borrow_mut()
992        .load_module_with_source(path_str, &plugin_name)
993        .await?;
994    let load_elapsed = load_start.elapsed();
995
996    tracing::debug!(
997        "load_plugin_internal: plugin '{}' loaded successfully in {:?}",
998        plugin_name,
999        load_elapsed
1000    );
1001
1002    // Store plugin info
1003    plugins.insert(
1004        plugin_name.clone(),
1005        TsPluginInfo {
1006            name: plugin_name.clone(),
1007            path: path.to_path_buf(),
1008            enabled: true,
1009        },
1010    );
1011
1012    tracing::debug!(
1013        "load_plugin_internal: plugin '{}' registered, total plugins loaded: {}",
1014        plugin_name,
1015        plugins.len()
1016    );
1017
1018    Ok(())
1019}
1020
1021/// Load all plugins from a directory
1022async fn load_plugins_from_dir_internal(
1023    runtime: Rc<RefCell<QuickJsBackend>>,
1024    plugins: &mut HashMap<String, TsPluginInfo>,
1025    dir: &Path,
1026) -> Vec<String> {
1027    tracing::debug!(
1028        "load_plugins_from_dir_internal: scanning directory {:?}",
1029        dir
1030    );
1031    let mut errors = Vec::new();
1032
1033    if !dir.exists() {
1034        tracing::warn!("Plugin directory does not exist: {:?}", dir);
1035        return errors;
1036    }
1037
1038    // Scan directory for .ts and .js files
1039    match std::fs::read_dir(dir) {
1040        Ok(entries) => {
1041            for entry in entries.flatten() {
1042                let path = entry.path();
1043                let ext = path.extension().and_then(|s| s.to_str());
1044                if ext == Some("ts") || ext == Some("js") {
1045                    tracing::debug!(
1046                        "load_plugins_from_dir_internal: attempting to load {:?}",
1047                        path
1048                    );
1049                    if let Err(e) = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await
1050                    {
1051                        let err = format!("Failed to load {:?}: {}", path, e);
1052                        tracing::error!("{}", err);
1053                        errors.push(err);
1054                    }
1055                }
1056            }
1057
1058            tracing::debug!(
1059                "load_plugins_from_dir_internal: finished loading from {:?}, {} errors",
1060                dir,
1061                errors.len()
1062            );
1063        }
1064        Err(e) => {
1065            let err = format!("Failed to read plugin directory: {}", e);
1066            tracing::error!("{}", err);
1067            errors.push(err);
1068        }
1069    }
1070
1071    errors
1072}
1073
1074/// Load all plugins from a directory with config support
1075/// Returns (errors, discovered_plugins) where discovered_plugins contains all
1076/// found plugin files with their configs (respecting enabled state from provided configs)
1077async fn load_plugins_from_dir_with_config_internal(
1078    runtime: Rc<RefCell<QuickJsBackend>>,
1079    plugins: &mut HashMap<String, TsPluginInfo>,
1080    dir: &Path,
1081    plugin_configs: &HashMap<String, PluginConfig>,
1082) -> (Vec<String>, HashMap<String, PluginConfig>) {
1083    tracing::debug!(
1084        "load_plugins_from_dir_with_config_internal: scanning directory {:?}",
1085        dir
1086    );
1087    let mut errors = Vec::new();
1088    let mut discovered_plugins: HashMap<String, PluginConfig> = HashMap::new();
1089
1090    if !dir.exists() {
1091        tracing::warn!("Plugin directory does not exist: {:?}", dir);
1092        return (errors, discovered_plugins);
1093    }
1094
1095    // First pass: scan directory and collect all plugin files
1096    let mut plugin_files: Vec<(String, std::path::PathBuf)> = Vec::new();
1097    match std::fs::read_dir(dir) {
1098        Ok(entries) => {
1099            for entry in entries.flatten() {
1100                let path = entry.path();
1101                let ext = path.extension().and_then(|s| s.to_str());
1102                if ext == Some("ts") || ext == Some("js") {
1103                    // Skip .i18n.json files (they're not plugins)
1104                    if path.to_string_lossy().contains(".i18n.") {
1105                        continue;
1106                    }
1107                    // Get plugin name from filename (without extension)
1108                    let plugin_name = path
1109                        .file_stem()
1110                        .and_then(|s| s.to_str())
1111                        .unwrap_or("unknown")
1112                        .to_string();
1113                    plugin_files.push((plugin_name, path));
1114                }
1115            }
1116        }
1117        Err(e) => {
1118            let err = format!("Failed to read plugin directory: {}", e);
1119            tracing::error!("{}", err);
1120            errors.push(err);
1121            return (errors, discovered_plugins);
1122        }
1123    }
1124
1125    // Second pass: build discovered_plugins map and load enabled plugins
1126    for (plugin_name, path) in plugin_files {
1127        // Check if we have an existing config for this plugin
1128        let config = if let Some(existing_config) = plugin_configs.get(&plugin_name) {
1129            // Use existing config but ensure path is set
1130            PluginConfig {
1131                enabled: existing_config.enabled,
1132                path: Some(path.clone()),
1133            }
1134        } else {
1135            // Create new config with default enabled = true
1136            PluginConfig::new_with_path(path.clone())
1137        };
1138
1139        // Add to discovered plugins
1140        discovered_plugins.insert(plugin_name.clone(), config.clone());
1141
1142        // Only load if enabled
1143        if config.enabled {
1144            tracing::debug!(
1145                "load_plugins_from_dir_with_config_internal: loading enabled plugin '{}'",
1146                plugin_name
1147            );
1148            if let Err(e) = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await {
1149                let err = format!("Failed to load {:?}: {}", path, e);
1150                tracing::error!("{}", err);
1151                errors.push(err);
1152            }
1153        } else {
1154            tracing::info!(
1155                "load_plugins_from_dir_with_config_internal: skipping disabled plugin '{}'",
1156                plugin_name
1157            );
1158        }
1159    }
1160
1161    tracing::debug!(
1162        "load_plugins_from_dir_with_config_internal: finished. Discovered {} plugins, {} errors",
1163        discovered_plugins.len(),
1164        errors.len()
1165    );
1166
1167    (errors, discovered_plugins)
1168}
1169
1170/// Unload a plugin
1171fn unload_plugin_internal(
1172    runtime: Rc<RefCell<QuickJsBackend>>,
1173    plugins: &mut HashMap<String, TsPluginInfo>,
1174    name: &str,
1175) -> Result<()> {
1176    if plugins.remove(name).is_some() {
1177        tracing::info!("Unloading TypeScript plugin: {}", name);
1178
1179        // Unregister i18n strings
1180        runtime
1181            .borrow_mut()
1182            .services
1183            .unregister_plugin_strings(name);
1184
1185        // Remove all commands registered by this plugin
1186        runtime
1187            .borrow()
1188            .services
1189            .unregister_commands_by_plugin(name);
1190
1191        Ok(())
1192    } else {
1193        Err(anyhow!("Plugin '{}' not found", name))
1194    }
1195}
1196
1197/// Reload a plugin
1198async fn reload_plugin_internal(
1199    runtime: Rc<RefCell<QuickJsBackend>>,
1200    plugins: &mut HashMap<String, TsPluginInfo>,
1201    name: &str,
1202) -> Result<()> {
1203    let path = plugins
1204        .get(name)
1205        .ok_or_else(|| anyhow!("Plugin '{}' not found", name))?
1206        .path
1207        .clone();
1208
1209    unload_plugin_internal(Rc::clone(&runtime), plugins, name)?;
1210    load_plugin_internal(runtime, plugins, &path).await?;
1211
1212    Ok(())
1213}
1214
1215#[cfg(test)]
1216mod tests {
1217    use super::*;
1218    use fresh_core::hooks::hook_args_to_json;
1219
1220    #[test]
1221    fn test_oneshot_channel() {
1222        let (tx, rx) = oneshot::channel::<i32>();
1223        assert!(tx.send(42).is_ok());
1224        assert_eq!(rx.recv().unwrap(), 42);
1225    }
1226
1227    #[test]
1228    fn test_hook_args_to_json_editor_initialized() {
1229        let args = HookArgs::EditorInitialized;
1230        let json = hook_args_to_json(&args).unwrap();
1231        assert_eq!(json, "{}");
1232    }
1233
1234    #[test]
1235    fn test_hook_args_to_json_prompt_changed() {
1236        let args = HookArgs::PromptChanged {
1237            prompt_type: "search".to_string(),
1238            input: "test".to_string(),
1239        };
1240        let json = hook_args_to_json(&args).unwrap();
1241        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1242        assert_eq!(parsed["prompt_type"], "search");
1243        assert_eq!(parsed["input"], "test");
1244    }
1245}