Skip to main content

fresh_plugin_runtime/
thread.rs

1//! Plugin Thread: Dedicated thread for TypeScript plugin execution
2//!
3//! This module implements a dedicated thread architecture for plugin execution,
4//! using QuickJS as the JavaScript runtime with oxc for TypeScript transpilation.
5//!
6//! Architecture:
7//! - Main thread (UI) sends requests to plugin thread via channel
8//! - Plugin thread owns QuickJS runtime and persistent tokio runtime
9//! - Results are sent back via the existing PluginCommand channel
10//! - Async operations complete naturally without runtime destruction
11
12use crate::backend::quickjs_backend::{PendingResponses, TsPluginInfo};
13use crate::backend::QuickJsBackend;
14use anyhow::{anyhow, Result};
15use fresh_core::api::{EditorStateSnapshot, PluginCommand};
16use fresh_core::hooks::HookArgs;
17use std::cell::RefCell;
18use std::collections::HashMap;
19use std::path::{Path, PathBuf};
20use std::rc::Rc;
21use std::sync::{Arc, RwLock};
22use std::thread::{self, JoinHandle};
23use std::time::Duration;
24
25// Re-export PluginConfig from fresh-core
26pub use fresh_core::config::PluginConfig;
27
28/// Request messages sent to the plugin thread
29#[derive(Debug)]
30pub enum PluginRequest {
31    /// Load a plugin from a file
32    LoadPlugin {
33        path: PathBuf,
34        response: oneshot::Sender<Result<()>>,
35    },
36
37    /// Resolve an async callback with a result (for async operations like SpawnProcess, Delay)
38    ResolveCallback {
39        callback_id: fresh_core::api::JsCallbackId,
40        result_json: String,
41    },
42
43    /// Reject an async callback with an error
44    RejectCallback {
45        callback_id: fresh_core::api::JsCallbackId,
46        error: String,
47    },
48
49    /// Load all plugins from a directory
50    LoadPluginsFromDir {
51        dir: PathBuf,
52        response: oneshot::Sender<Vec<String>>,
53    },
54
55    /// Load all plugins from a directory with config support
56    /// Returns (errors, discovered_plugins) where discovered_plugins contains
57    /// all found plugins with their paths and enabled status
58    LoadPluginsFromDirWithConfig {
59        dir: PathBuf,
60        plugin_configs: HashMap<String, PluginConfig>,
61        response: oneshot::Sender<(Vec<String>, HashMap<String, PluginConfig>)>,
62    },
63
64    /// Unload a plugin by name
65    UnloadPlugin {
66        name: String,
67        response: oneshot::Sender<Result<()>>,
68    },
69
70    /// Reload a plugin by name
71    ReloadPlugin {
72        name: String,
73        response: oneshot::Sender<Result<()>>,
74    },
75
76    /// Execute a plugin action
77    ExecuteAction {
78        action_name: String,
79        response: oneshot::Sender<Result<()>>,
80    },
81
82    /// Run a hook (fire-and-forget, no response needed)
83    RunHook { hook_name: String, args: HookArgs },
84
85    /// Check if any handlers are registered for a hook
86    HasHookHandlers {
87        hook_name: String,
88        response: oneshot::Sender<bool>,
89    },
90
91    /// List all loaded plugins
92    ListPlugins {
93        response: oneshot::Sender<Vec<TsPluginInfo>>,
94    },
95
96    /// Shutdown the plugin thread
97    Shutdown,
98}
99
100/// Simple oneshot channel implementation
101pub mod oneshot {
102    use std::fmt;
103    use std::sync::mpsc;
104
105    pub struct Sender<T>(mpsc::SyncSender<T>);
106    pub struct Receiver<T>(mpsc::Receiver<T>);
107
108    use anyhow::Result;
109
110    impl<T> fmt::Debug for Sender<T> {
111        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112            f.debug_tuple("Sender").finish()
113        }
114    }
115
116    impl<T> fmt::Debug for Receiver<T> {
117        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118            f.debug_tuple("Receiver").finish()
119        }
120    }
121
122    impl<T> Sender<T> {
123        pub fn send(self, value: T) -> Result<(), T> {
124            self.0.send(value).map_err(|e| e.0)
125        }
126    }
127
128    impl<T> Receiver<T> {
129        pub fn recv(self) -> Result<T, mpsc::RecvError> {
130            self.0.recv()
131        }
132
133        pub fn recv_timeout(
134            self,
135            timeout: std::time::Duration,
136        ) -> Result<T, mpsc::RecvTimeoutError> {
137            self.0.recv_timeout(timeout)
138        }
139
140        pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
141            self.0.try_recv()
142        }
143    }
144
145    pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
146        let (tx, rx) = mpsc::sync_channel(1);
147        (Sender(tx), Receiver(rx))
148    }
149}
150
151/// Handle to the plugin thread for sending requests
152pub struct PluginThreadHandle {
153    /// Channel to send requests to the plugin thread
154    /// Wrapped in Option so we can drop it to signal shutdown
155    request_sender: Option<tokio::sync::mpsc::UnboundedSender<PluginRequest>>,
156
157    /// Thread join handle
158    thread_handle: Option<JoinHandle<()>>,
159
160    /// State snapshot handle for editor to update
161    state_snapshot: Arc<RwLock<EditorStateSnapshot>>,
162
163    /// Pending response senders for async operations (shared with runtime)
164    pending_responses: PendingResponses,
165
166    /// Receiver for plugin commands (polled by editor directly)
167    command_receiver: std::sync::mpsc::Receiver<PluginCommand>,
168}
169
170impl PluginThreadHandle {
171    /// Create a new plugin thread and return its handle
172    pub fn spawn(services: Arc<dyn fresh_core::services::PluginServiceBridge>) -> Result<Self> {
173        tracing::debug!("PluginThreadHandle::spawn: starting plugin thread creation");
174
175        // Create channel for plugin commands
176        let (command_sender, command_receiver) = std::sync::mpsc::channel();
177
178        // Create editor state snapshot for query API
179        let state_snapshot = Arc::new(RwLock::new(EditorStateSnapshot::new()));
180
181        // Create pending responses map (shared between handle and runtime)
182        let pending_responses: PendingResponses =
183            Arc::new(std::sync::Mutex::new(std::collections::HashMap::new()));
184        let thread_pending_responses = Arc::clone(&pending_responses);
185
186        // Create channel for requests (unbounded allows sync send, async recv)
187        let (request_sender, request_receiver) = tokio::sync::mpsc::unbounded_channel();
188
189        // Clone state snapshot for the thread
190        let thread_state_snapshot = Arc::clone(&state_snapshot);
191
192        // Spawn the plugin thread
193        tracing::debug!("PluginThreadHandle::spawn: spawning OS thread for plugin runtime");
194        let thread_handle = thread::spawn(move || {
195            tracing::debug!("Plugin thread: OS thread started, creating tokio runtime");
196            // Create tokio runtime for the plugin thread
197            let rt = match tokio::runtime::Builder::new_current_thread()
198                .enable_all()
199                .build()
200            {
201                Ok(rt) => {
202                    tracing::debug!("Plugin thread: tokio runtime created successfully");
203                    rt
204                }
205                Err(e) => {
206                    tracing::error!("Failed to create plugin thread runtime: {}", e);
207                    return;
208                }
209            };
210
211            // Create QuickJS runtime with state
212            tracing::debug!("Plugin thread: creating QuickJS runtime");
213            let runtime = match QuickJsBackend::with_state_and_responses(
214                Arc::clone(&thread_state_snapshot),
215                command_sender,
216                thread_pending_responses,
217                services.clone(),
218            ) {
219                Ok(rt) => {
220                    tracing::debug!("Plugin thread: QuickJS runtime created successfully");
221                    rt
222                }
223                Err(e) => {
224                    tracing::error!("Failed to create QuickJS runtime: {}", e);
225                    return;
226                }
227            };
228
229            // Create internal manager state
230            let mut plugins: HashMap<String, TsPluginInfo> = HashMap::new();
231
232            // Run the event loop with a LocalSet to allow concurrent task execution
233            tracing::debug!("Plugin thread: starting event loop with LocalSet");
234            let local = tokio::task::LocalSet::new();
235            local.block_on(&rt, async {
236                // Wrap runtime in RefCell for interior mutability during concurrent operations
237                let runtime = Rc::new(RefCell::new(runtime));
238                tracing::debug!("Plugin thread: entering plugin_thread_loop");
239                plugin_thread_loop(runtime, &mut plugins, request_receiver).await;
240            });
241
242            tracing::info!("Plugin thread shutting down");
243        });
244
245        tracing::debug!("PluginThreadHandle::spawn: OS thread spawned, returning handle");
246        tracing::info!("Plugin thread spawned");
247
248        Ok(Self {
249            request_sender: Some(request_sender),
250            thread_handle: Some(thread_handle),
251            state_snapshot,
252            pending_responses,
253            command_receiver,
254        })
255    }
256
257    /// Check if the plugin thread is still alive
258    pub fn is_alive(&self) -> bool {
259        self.thread_handle
260            .as_ref()
261            .map(|h| !h.is_finished())
262            .unwrap_or(false)
263    }
264
265    /// Check thread health and panic if the plugin thread died due to a panic.
266    /// This propagates plugin thread panics to the calling thread.
267    /// Call this periodically to detect plugin thread failures.
268    pub fn check_thread_health(&mut self) {
269        if let Some(handle) = &self.thread_handle {
270            if handle.is_finished() {
271                tracing::error!(
272                    "check_thread_health: plugin thread is finished, checking for panic"
273                );
274                // Thread finished - take ownership and check result
275                if let Some(handle) = self.thread_handle.take() {
276                    match handle.join() {
277                        Ok(()) => {
278                            tracing::warn!("Plugin thread exited normally (unexpected)");
279                        }
280                        Err(panic_payload) => {
281                            // Re-panic with the original panic message to propagate it
282                            std::panic::resume_unwind(panic_payload);
283                        }
284                    }
285                }
286            }
287        }
288    }
289
290    /// Deliver a response to a pending async operation in the plugin
291    ///
292    /// This is called by the editor after processing a command that requires a response.
293    pub fn deliver_response(&self, response: fresh_core::api::PluginResponse) {
294        // First try to find a pending Rust request (oneshot channel)
295        if respond_to_pending(&self.pending_responses, response.clone()) {
296            return;
297        }
298
299        // If not found, it must be a JS callback
300        use fresh_core::api::{JsCallbackId, PluginResponse};
301
302        match response {
303            PluginResponse::VirtualBufferCreated {
304                request_id,
305                buffer_id,
306                split_id,
307            } => {
308                // Return an object with bufferId and splitId (camelCase for JS)
309                let result = serde_json::json!({
310                    "bufferId": buffer_id.0,
311                    "splitId": split_id.map(|s| s.0)
312                });
313                self.resolve_callback(JsCallbackId(request_id), result.to_string());
314            }
315            PluginResponse::LspRequest { request_id, result } => match result {
316                Ok(value) => {
317                    self.resolve_callback(JsCallbackId(request_id), value.to_string());
318                }
319                Err(e) => {
320                    self.reject_callback(JsCallbackId(request_id), e);
321                }
322            },
323            PluginResponse::HighlightsComputed { request_id, spans } => {
324                let result = serde_json::to_string(&spans).unwrap_or_else(|_| "[]".to_string());
325                self.resolve_callback(JsCallbackId(request_id), result);
326            }
327            PluginResponse::BufferText { request_id, text } => match text {
328                Ok(content) => {
329                    // JSON stringify the content string
330                    let result =
331                        serde_json::to_string(&content).unwrap_or_else(|_| "\"\"".to_string());
332                    self.resolve_callback(JsCallbackId(request_id), result);
333                }
334                Err(e) => {
335                    self.reject_callback(JsCallbackId(request_id), e);
336                }
337            },
338            PluginResponse::CompositeBufferCreated {
339                request_id,
340                buffer_id,
341            } => {
342                // Return just the buffer_id number, not an object
343                self.resolve_callback(JsCallbackId(request_id), buffer_id.0.to_string());
344            }
345            PluginResponse::LineStartPosition {
346                request_id,
347                position,
348            } => {
349                // Return the position as a number or null
350                let result =
351                    serde_json::to_string(&position).unwrap_or_else(|_| "null".to_string());
352                self.resolve_callback(JsCallbackId(request_id), result);
353            }
354            PluginResponse::LineEndPosition {
355                request_id,
356                position,
357            } => {
358                // Return the position as a number or null
359                let result =
360                    serde_json::to_string(&position).unwrap_or_else(|_| "null".to_string());
361                self.resolve_callback(JsCallbackId(request_id), result);
362            }
363            PluginResponse::BufferLineCount { request_id, count } => {
364                // Return the count as a number or null
365                let result = serde_json::to_string(&count).unwrap_or_else(|_| "null".to_string());
366                self.resolve_callback(JsCallbackId(request_id), result);
367            }
368            PluginResponse::TerminalCreated {
369                request_id,
370                buffer_id,
371                terminal_id,
372                split_id,
373            } => {
374                let result = serde_json::json!({
375                    "bufferId": buffer_id.0,
376                    "terminalId": terminal_id.0,
377                    "splitId": split_id.map(|s| s.0)
378                });
379                self.resolve_callback(JsCallbackId(request_id), result.to_string());
380            }
381            PluginResponse::SplitByLabel {
382                request_id,
383                split_id,
384            } => {
385                let result = serde_json::to_string(&split_id.map(|s| s.0))
386                    .unwrap_or_else(|_| "null".to_string());
387                self.resolve_callback(JsCallbackId(request_id), result);
388            }
389        }
390    }
391
392    /// Load a plugin from a file (blocking)
393    pub fn load_plugin(&self, path: &Path) -> Result<()> {
394        let (tx, rx) = oneshot::channel();
395        self.request_sender
396            .as_ref()
397            .ok_or_else(|| anyhow!("Plugin thread shut down"))?
398            .send(PluginRequest::LoadPlugin {
399                path: path.to_path_buf(),
400                response: tx,
401            })
402            .map_err(|_| anyhow!("Plugin thread not responding"))?;
403
404        rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
405    }
406
407    /// Load all plugins from a directory (blocking)
408    pub fn load_plugins_from_dir(&self, dir: &Path) -> Vec<String> {
409        let (tx, rx) = oneshot::channel();
410        let Some(sender) = self.request_sender.as_ref() else {
411            return vec!["Plugin thread shut down".to_string()];
412        };
413        if sender
414            .send(PluginRequest::LoadPluginsFromDir {
415                dir: dir.to_path_buf(),
416                response: tx,
417            })
418            .is_err()
419        {
420            return vec!["Plugin thread not responding".to_string()];
421        }
422
423        rx.recv()
424            .unwrap_or_else(|_| vec!["Plugin thread closed".to_string()])
425    }
426
427    /// Load all plugins from a directory with config support (blocking)
428    /// Returns (errors, discovered_plugins) where discovered_plugins is a map of
429    /// plugin name -> PluginConfig with paths populated.
430    pub fn load_plugins_from_dir_with_config(
431        &self,
432        dir: &Path,
433        plugin_configs: &HashMap<String, PluginConfig>,
434    ) -> (Vec<String>, HashMap<String, PluginConfig>) {
435        let (tx, rx) = oneshot::channel();
436        let Some(sender) = self.request_sender.as_ref() else {
437            return (vec!["Plugin thread shut down".to_string()], HashMap::new());
438        };
439        if sender
440            .send(PluginRequest::LoadPluginsFromDirWithConfig {
441                dir: dir.to_path_buf(),
442                plugin_configs: plugin_configs.clone(),
443                response: tx,
444            })
445            .is_err()
446        {
447            return (
448                vec!["Plugin thread not responding".to_string()],
449                HashMap::new(),
450            );
451        }
452
453        rx.recv()
454            .unwrap_or_else(|_| (vec!["Plugin thread closed".to_string()], HashMap::new()))
455    }
456
457    /// Unload a plugin (blocking)
458    pub fn unload_plugin(&self, name: &str) -> Result<()> {
459        let (tx, rx) = oneshot::channel();
460        self.request_sender
461            .as_ref()
462            .ok_or_else(|| anyhow!("Plugin thread shut down"))?
463            .send(PluginRequest::UnloadPlugin {
464                name: name.to_string(),
465                response: tx,
466            })
467            .map_err(|_| anyhow!("Plugin thread not responding"))?;
468
469        rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
470    }
471
472    /// Reload a plugin (blocking)
473    pub fn reload_plugin(&self, name: &str) -> Result<()> {
474        let (tx, rx) = oneshot::channel();
475        self.request_sender
476            .as_ref()
477            .ok_or_else(|| anyhow!("Plugin thread shut down"))?
478            .send(PluginRequest::ReloadPlugin {
479                name: name.to_string(),
480                response: tx,
481            })
482            .map_err(|_| anyhow!("Plugin thread not responding"))?;
483
484        rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
485    }
486
487    /// Execute a plugin action (non-blocking)
488    ///
489    /// Returns a receiver that will receive the result when the action completes.
490    /// The caller should poll this while processing commands to avoid deadlock.
491    pub fn execute_action_async(&self, action_name: &str) -> Result<oneshot::Receiver<Result<()>>> {
492        tracing::trace!("execute_action_async: starting action '{}'", action_name);
493        let (tx, rx) = oneshot::channel();
494        self.request_sender
495            .as_ref()
496            .ok_or_else(|| anyhow!("Plugin thread shut down"))?
497            .send(PluginRequest::ExecuteAction {
498                action_name: action_name.to_string(),
499                response: tx,
500            })
501            .map_err(|_| anyhow!("Plugin thread not responding"))?;
502
503        tracing::trace!("execute_action_async: request sent for '{}'", action_name);
504        Ok(rx)
505    }
506
507    /// Run a hook (non-blocking, fire-and-forget)
508    ///
509    /// This is the key improvement: hooks are now non-blocking.
510    /// The plugin thread will execute them asynchronously and
511    /// any results will come back via the PluginCommand channel.
512    pub fn run_hook(&self, hook_name: &str, args: HookArgs) {
513        if let Some(sender) = self.request_sender.as_ref() {
514            let _ = sender.send(PluginRequest::RunHook {
515                hook_name: hook_name.to_string(),
516                args,
517            });
518        }
519    }
520
521    /// Check if any handlers are registered for a hook (blocking)
522    pub fn has_hook_handlers(&self, hook_name: &str) -> bool {
523        let (tx, rx) = oneshot::channel();
524        let Some(sender) = self.request_sender.as_ref() else {
525            return false;
526        };
527        if sender
528            .send(PluginRequest::HasHookHandlers {
529                hook_name: hook_name.to_string(),
530                response: tx,
531            })
532            .is_err()
533        {
534            return false;
535        }
536
537        rx.recv().unwrap_or(false)
538    }
539
540    /// List all loaded plugins (blocking)
541    pub fn list_plugins(&self) -> Vec<TsPluginInfo> {
542        let (tx, rx) = oneshot::channel();
543        let Some(sender) = self.request_sender.as_ref() else {
544            return vec![];
545        };
546        if sender
547            .send(PluginRequest::ListPlugins { response: tx })
548            .is_err()
549        {
550            return vec![];
551        }
552
553        rx.recv().unwrap_or_default()
554    }
555
556    /// Process pending plugin commands (non-blocking)
557    ///
558    /// Returns immediately with any pending commands by polling the command queue directly.
559    /// This does not require the plugin thread to respond, avoiding deadlocks.
560    pub fn process_commands(&mut self) -> Vec<PluginCommand> {
561        let mut commands = Vec::new();
562        while let Ok(cmd) = self.command_receiver.try_recv() {
563            commands.push(cmd);
564        }
565        commands
566    }
567
568    /// Process commands, blocking until `HookCompleted` for the given hook arrives.
569    ///
570    /// After the render loop fires a hook like `lines_changed`, the plugin thread
571    /// processes it and sends back commands (AddConceal, etc.) followed by a
572    /// `HookCompleted` sentinel. This method waits for that sentinel so the
573    /// render has all conceal/overlay updates before painting the frame.
574    ///
575    /// Returns all non-sentinel commands collected while waiting.
576    /// Falls back to non-blocking drain if the timeout expires.
577    pub fn process_commands_until_hook_completed(
578        &mut self,
579        hook_name: &str,
580        timeout: std::time::Duration,
581    ) -> Vec<PluginCommand> {
582        let mut commands = Vec::new();
583        let deadline = std::time::Instant::now() + timeout;
584
585        loop {
586            let remaining = deadline.saturating_duration_since(std::time::Instant::now());
587            if remaining.is_zero() {
588                // Timeout: drain whatever is available
589                while let Ok(cmd) = self.command_receiver.try_recv() {
590                    if !matches!(&cmd, PluginCommand::HookCompleted { .. }) {
591                        commands.push(cmd);
592                    }
593                }
594                break;
595            }
596
597            match self.command_receiver.recv_timeout(remaining) {
598                Ok(PluginCommand::HookCompleted {
599                    hook_name: ref name,
600                }) if name == hook_name => {
601                    // Got our sentinel — drain any remaining commands
602                    while let Ok(cmd) = self.command_receiver.try_recv() {
603                        if !matches!(&cmd, PluginCommand::HookCompleted { .. }) {
604                            commands.push(cmd);
605                        }
606                    }
607                    break;
608                }
609                Ok(PluginCommand::HookCompleted { .. }) => {
610                    // Sentinel for a different hook, keep waiting
611                    continue;
612                }
613                Ok(cmd) => {
614                    commands.push(cmd);
615                }
616                Err(_) => {
617                    // Timeout or disconnected
618                    break;
619                }
620            }
621        }
622
623        commands
624    }
625
626    /// Get the state snapshot handle for editor to update
627    pub fn state_snapshot_handle(&self) -> Arc<RwLock<EditorStateSnapshot>> {
628        Arc::clone(&self.state_snapshot)
629    }
630
631    /// Shutdown the plugin thread
632    pub fn shutdown(&mut self) {
633        tracing::debug!("PluginThreadHandle::shutdown: starting shutdown");
634
635        // Drop all pending response senders - this wakes up any plugin code waiting for responses
636        // by causing their oneshot receivers to return an error
637        if let Ok(mut pending) = self.pending_responses.lock() {
638            if !pending.is_empty() {
639                tracing::warn!(
640                    "PluginThreadHandle::shutdown: dropping {} pending responses: {:?}",
641                    pending.len(),
642                    pending.keys().collect::<Vec<_>>()
643                );
644                pending.clear(); // Drop all senders, waking up waiting receivers
645            }
646        }
647
648        // First send a Shutdown request to allow clean processing of pending work
649        if let Some(sender) = self.request_sender.as_ref() {
650            tracing::debug!("PluginThreadHandle::shutdown: sending Shutdown request");
651            let _ = sender.send(PluginRequest::Shutdown);
652        }
653
654        // Then drop the sender to close the channel - this reliably wakes the receiver
655        // even when it's parked in a tokio LocalSet (the Shutdown message above may not wake it)
656        tracing::debug!("PluginThreadHandle::shutdown: dropping request_sender to close channel");
657        self.request_sender.take();
658
659        if let Some(handle) = self.thread_handle.take() {
660            tracing::debug!("PluginThreadHandle::shutdown: joining plugin thread");
661            let _ = handle.join();
662            tracing::debug!("PluginThreadHandle::shutdown: plugin thread joined");
663        }
664
665        tracing::debug!("PluginThreadHandle::shutdown: shutdown complete");
666    }
667
668    /// Resolve an async callback in the plugin runtime
669    /// Called by the app when async operations (SpawnProcess, Delay) complete
670    pub fn resolve_callback(
671        &self,
672        callback_id: fresh_core::api::JsCallbackId,
673        result_json: String,
674    ) {
675        if let Some(sender) = self.request_sender.as_ref() {
676            let _ = sender.send(PluginRequest::ResolveCallback {
677                callback_id,
678                result_json,
679            });
680        }
681    }
682
683    /// Reject an async callback in the plugin runtime
684    /// Called by the app when async operations fail
685    pub fn reject_callback(&self, callback_id: fresh_core::api::JsCallbackId, error: String) {
686        if let Some(sender) = self.request_sender.as_ref() {
687            let _ = sender.send(PluginRequest::RejectCallback { callback_id, error });
688        }
689    }
690}
691
692impl Drop for PluginThreadHandle {
693    fn drop(&mut self) {
694        self.shutdown();
695    }
696}
697
698fn respond_to_pending(
699    pending_responses: &PendingResponses,
700    response: fresh_core::api::PluginResponse,
701) -> bool {
702    let request_id = match &response {
703        fresh_core::api::PluginResponse::VirtualBufferCreated { request_id, .. } => *request_id,
704        fresh_core::api::PluginResponse::LspRequest { request_id, .. } => *request_id,
705        fresh_core::api::PluginResponse::HighlightsComputed { request_id, .. } => *request_id,
706        fresh_core::api::PluginResponse::BufferText { request_id, .. } => *request_id,
707        fresh_core::api::PluginResponse::CompositeBufferCreated { request_id, .. } => *request_id,
708        fresh_core::api::PluginResponse::LineStartPosition { request_id, .. } => *request_id,
709        fresh_core::api::PluginResponse::LineEndPosition { request_id, .. } => *request_id,
710        fresh_core::api::PluginResponse::BufferLineCount { request_id, .. } => *request_id,
711        fresh_core::api::PluginResponse::TerminalCreated { request_id, .. } => *request_id,
712        fresh_core::api::PluginResponse::SplitByLabel { request_id, .. } => *request_id,
713    };
714
715    let sender = {
716        let mut pending = pending_responses.lock().unwrap();
717        pending.remove(&request_id)
718    };
719
720    if let Some(tx) = sender {
721        let _ = tx.send(response);
722        true
723    } else {
724        false
725    }
726}
727
728#[cfg(test)]
729mod plugin_thread_tests {
730    use super::*;
731    use fresh_core::api::PluginResponse;
732    use serde_json::json;
733    use std::collections::HashMap;
734    use std::sync::{Arc, Mutex};
735    use tokio::sync::oneshot;
736
737    #[test]
738    fn respond_to_pending_sends_lsp_response() {
739        let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
740        let (tx, mut rx) = oneshot::channel();
741        pending.lock().unwrap().insert(123, tx);
742
743        respond_to_pending(
744            &pending,
745            PluginResponse::LspRequest {
746                request_id: 123,
747                result: Ok(json!({ "key": "value" })),
748            },
749        );
750
751        let response = rx.try_recv().expect("expected response");
752        match response {
753            PluginResponse::LspRequest { result, .. } => {
754                assert_eq!(result.unwrap(), json!({ "key": "value" }));
755            }
756            _ => panic!("unexpected variant"),
757        }
758
759        assert!(pending.lock().unwrap().is_empty());
760    }
761
762    #[test]
763    fn respond_to_pending_handles_virtual_buffer_created() {
764        let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
765        let (tx, mut rx) = oneshot::channel();
766        pending.lock().unwrap().insert(456, tx);
767
768        respond_to_pending(
769            &pending,
770            PluginResponse::VirtualBufferCreated {
771                request_id: 456,
772                buffer_id: fresh_core::BufferId(7),
773                split_id: Some(fresh_core::SplitId(1)),
774            },
775        );
776
777        let response = rx.try_recv().expect("expected response");
778        match response {
779            PluginResponse::VirtualBufferCreated { buffer_id, .. } => {
780                assert_eq!(buffer_id.0, 7);
781            }
782            _ => panic!("unexpected variant"),
783        }
784
785        assert!(pending.lock().unwrap().is_empty());
786    }
787}
788
789/// Main loop for the plugin thread
790///
791/// Uses `tokio::select!` to interleave request handling with periodic event loop
792/// polling. This allows long-running promises (like process spawns) to make progress
793/// even when no requests are coming in, preventing the UI from getting stuck.
794async fn plugin_thread_loop(
795    runtime: Rc<RefCell<QuickJsBackend>>,
796    plugins: &mut HashMap<String, TsPluginInfo>,
797    mut request_receiver: tokio::sync::mpsc::UnboundedReceiver<PluginRequest>,
798) {
799    tracing::info!("Plugin thread event loop started");
800
801    // Interval for polling the JS event loop when there's pending work
802    let poll_interval = Duration::from_millis(1);
803    let mut has_pending_work = false;
804
805    loop {
806        // Check for fatal JS errors (e.g., unhandled promise rejections in test mode)
807        // These are set via set_fatal_js_error() because panicking inside FFI callbacks
808        // is caught by rquickjs and doesn't terminate the thread.
809        if crate::backend::has_fatal_js_error() {
810            if let Some(error_msg) = crate::backend::take_fatal_js_error() {
811                tracing::error!(
812                    "Fatal JS error detected, terminating plugin thread: {}",
813                    error_msg
814                );
815                panic!("Fatal plugin error: {}", error_msg);
816            }
817        }
818
819        tokio::select! {
820            biased; // Prefer handling requests over polling
821
822            request = request_receiver.recv() => {
823                match request {
824                    Some(PluginRequest::ExecuteAction {
825                        action_name,
826                        response,
827                    }) => {
828                        // Start the action without blocking - this allows us to process
829                        // ResolveCallback requests that the action may be waiting for.
830                        let result = runtime.borrow_mut().start_action(&action_name);
831                        let _ = response.send(result);
832                        has_pending_work = true; // Action may have started async work
833                    }
834                    Some(request) => {
835                        let should_shutdown =
836                            handle_request(request, Rc::clone(&runtime), plugins).await;
837
838                        if should_shutdown {
839                            break;
840                        }
841                        has_pending_work = true; // Request may have started async work
842                    }
843                    None => {
844                        // Channel closed
845                        tracing::info!("Plugin thread request channel closed");
846                        break;
847                    }
848                }
849            }
850
851            // Poll the JS event loop periodically to make progress on pending promises
852            _ = tokio::time::sleep(poll_interval), if has_pending_work => {
853                has_pending_work = runtime.borrow_mut().poll_event_loop_once();
854            }
855        }
856    }
857}
858
859/// Execute an action while processing incoming hook requests concurrently.
860///
861/// This prevents deadlock when an action awaits a response from the main thread
862/// while the main thread is waiting for a blocking hook to complete.
863///
864/// # Safety (clippy::await_holding_refcell_ref)
865/// The RefCell borrow held across await is safe because:
866/// - This runs on a single-threaded tokio runtime (no parallel task execution)
867/// - No spawn_local calls exist that could create concurrent access to `runtime`
868/// - The runtime Rc<RefCell<>> is never shared with other concurrent tasks
869
870/// Run a hook with Rc<RefCell<QuickJsBackend>>
871///
872/// # Safety (clippy::await_holding_refcell_ref)
873/// The RefCell borrow held across await is safe because:
874/// - This runs on a single-threaded tokio runtime (no parallel task execution)
875/// - No spawn_local calls exist that could create concurrent access to `runtime`
876/// - The runtime Rc<RefCell<>> is never shared with other concurrent tasks
877#[allow(clippy::await_holding_refcell_ref)]
878async fn run_hook_internal_rc(
879    runtime: Rc<RefCell<QuickJsBackend>>,
880    hook_name: &str,
881    args: &HookArgs,
882) -> Result<()> {
883    // Convert HookArgs to serde_json::Value using hook_args_to_json which produces flat JSON
884    // (not enum-tagged JSON from serde's default Serialize)
885    let json_start = std::time::Instant::now();
886    let json_data = fresh_core::hooks::hook_args_to_json(args)?;
887    tracing::trace!(
888        hook = hook_name,
889        json_us = json_start.elapsed().as_micros(),
890        "hook args serialized"
891    );
892
893    // Emit to TypeScript handlers
894    let emit_start = std::time::Instant::now();
895    runtime.borrow_mut().emit(hook_name, &json_data).await?;
896    tracing::trace!(
897        hook = hook_name,
898        emit_ms = emit_start.elapsed().as_millis(),
899        "emit completed"
900    );
901
902    Ok(())
903}
904
905/// Handle a single request in the plugin thread
906async fn handle_request(
907    request: PluginRequest,
908    runtime: Rc<RefCell<QuickJsBackend>>,
909    plugins: &mut HashMap<String, TsPluginInfo>,
910) -> bool {
911    match request {
912        PluginRequest::LoadPlugin { path, response } => {
913            let result = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await;
914            let _ = response.send(result);
915        }
916
917        PluginRequest::LoadPluginsFromDir { dir, response } => {
918            let errors = load_plugins_from_dir_internal(Rc::clone(&runtime), plugins, &dir).await;
919            let _ = response.send(errors);
920        }
921
922        PluginRequest::LoadPluginsFromDirWithConfig {
923            dir,
924            plugin_configs,
925            response,
926        } => {
927            let (errors, discovered) = load_plugins_from_dir_with_config_internal(
928                Rc::clone(&runtime),
929                plugins,
930                &dir,
931                &plugin_configs,
932            )
933            .await;
934            let _ = response.send((errors, discovered));
935        }
936
937        PluginRequest::UnloadPlugin { name, response } => {
938            let result = unload_plugin_internal(Rc::clone(&runtime), plugins, &name);
939            let _ = response.send(result);
940        }
941
942        PluginRequest::ReloadPlugin { name, response } => {
943            let result = reload_plugin_internal(Rc::clone(&runtime), plugins, &name).await;
944            let _ = response.send(result);
945        }
946
947        PluginRequest::ExecuteAction {
948            action_name,
949            response,
950        } => {
951            // This is handled in plugin_thread_loop with select! for concurrent processing
952            // If we get here, it's an unexpected state
953            tracing::error!(
954                "ExecuteAction should be handled in main loop, not here: {}",
955                action_name
956            );
957            let _ = response.send(Err(anyhow::anyhow!(
958                "Internal error: ExecuteAction in wrong handler"
959            )));
960        }
961
962        PluginRequest::RunHook { hook_name, args } => {
963            // Fire-and-forget hook execution
964            let hook_start = std::time::Instant::now();
965            // Use info level for prompt hooks to aid debugging
966            if hook_name == "prompt_confirmed" || hook_name == "prompt_cancelled" {
967                tracing::info!(hook = %hook_name, ?args, "RunHook request received (prompt hook)");
968            } else {
969                tracing::trace!(hook = %hook_name, "RunHook request received");
970            }
971            if let Err(e) = run_hook_internal_rc(Rc::clone(&runtime), &hook_name, &args).await {
972                let error_msg = format!("Plugin error in '{}': {}", hook_name, e);
973                tracing::error!("{}", error_msg);
974                // Surface the error to the UI
975                runtime.borrow_mut().send_status(error_msg);
976            }
977            // Send sentinel so the main thread can wait deterministically
978            // for all commands from this hook to be available.
979            runtime.borrow().send_hook_completed(hook_name.clone());
980            if hook_name == "prompt_confirmed" || hook_name == "prompt_cancelled" {
981                tracing::info!(
982                    hook = %hook_name,
983                    elapsed_ms = hook_start.elapsed().as_millis(),
984                    "RunHook completed (prompt hook)"
985                );
986            } else {
987                tracing::trace!(
988                    hook = %hook_name,
989                    elapsed_ms = hook_start.elapsed().as_millis(),
990                    "RunHook completed"
991                );
992            }
993        }
994
995        PluginRequest::HasHookHandlers {
996            hook_name,
997            response,
998        } => {
999            let has_handlers = runtime.borrow().has_handlers(&hook_name);
1000            let _ = response.send(has_handlers);
1001        }
1002
1003        PluginRequest::ListPlugins { response } => {
1004            let plugin_list: Vec<TsPluginInfo> = plugins.values().cloned().collect();
1005            let _ = response.send(plugin_list);
1006        }
1007
1008        PluginRequest::ResolveCallback {
1009            callback_id,
1010            result_json,
1011        } => {
1012            tracing::info!(
1013                "ResolveCallback: resolving callback_id={} with result_json={}",
1014                callback_id,
1015                result_json
1016            );
1017            runtime
1018                .borrow_mut()
1019                .resolve_callback(callback_id, &result_json);
1020            // resolve_callback now runs execute_pending_job() internally
1021            tracing::info!(
1022                "ResolveCallback: done resolving callback_id={}",
1023                callback_id
1024            );
1025        }
1026
1027        PluginRequest::RejectCallback { callback_id, error } => {
1028            runtime.borrow_mut().reject_callback(callback_id, &error);
1029            // reject_callback now runs execute_pending_job() internally
1030        }
1031
1032        PluginRequest::Shutdown => {
1033            tracing::info!("Plugin thread received shutdown request");
1034            return true;
1035        }
1036    }
1037
1038    false
1039}
1040
1041/// Load a plugin from a file
1042///
1043/// # Safety (clippy::await_holding_refcell_ref)
1044/// The RefCell borrow held across await is safe because:
1045/// - This runs on a single-threaded tokio runtime (no parallel task execution)
1046/// - No spawn_local calls exist that could create concurrent access to `runtime`
1047/// - The runtime Rc<RefCell<>> is never shared with other concurrent tasks
1048#[allow(clippy::await_holding_refcell_ref)]
1049async fn load_plugin_internal(
1050    runtime: Rc<RefCell<QuickJsBackend>>,
1051    plugins: &mut HashMap<String, TsPluginInfo>,
1052    path: &Path,
1053) -> Result<()> {
1054    let plugin_name = path
1055        .file_stem()
1056        .and_then(|s| s.to_str())
1057        .ok_or_else(|| anyhow!("Invalid plugin filename"))?
1058        .to_string();
1059
1060    tracing::info!("Loading TypeScript plugin: {} from {:?}", plugin_name, path);
1061    tracing::debug!(
1062        "load_plugin_internal: starting module load for plugin '{}'",
1063        plugin_name
1064    );
1065
1066    // Load and execute the module, passing plugin name for command registration
1067    let path_str = path
1068        .to_str()
1069        .ok_or_else(|| anyhow!("Invalid path encoding"))?;
1070
1071    // Try to load accompanying .i18n.json file
1072    let i18n_path = path.with_extension("i18n.json");
1073    if i18n_path.exists() {
1074        if let Ok(content) = std::fs::read_to_string(&i18n_path) {
1075            if let Ok(strings) = serde_json::from_str::<
1076                std::collections::HashMap<String, std::collections::HashMap<String, String>>,
1077            >(&content)
1078            {
1079                runtime
1080                    .borrow_mut()
1081                    .services
1082                    .register_plugin_strings(&plugin_name, strings);
1083                tracing::debug!("Loaded i18n strings for plugin '{}'", plugin_name);
1084            }
1085        }
1086    }
1087
1088    let load_start = std::time::Instant::now();
1089    runtime
1090        .borrow_mut()
1091        .load_module_with_source(path_str, &plugin_name)
1092        .await?;
1093    let load_elapsed = load_start.elapsed();
1094
1095    tracing::debug!(
1096        "load_plugin_internal: plugin '{}' loaded successfully in {:?}",
1097        plugin_name,
1098        load_elapsed
1099    );
1100
1101    // Store plugin info
1102    plugins.insert(
1103        plugin_name.clone(),
1104        TsPluginInfo {
1105            name: plugin_name.clone(),
1106            path: path.to_path_buf(),
1107            enabled: true,
1108        },
1109    );
1110
1111    tracing::debug!(
1112        "load_plugin_internal: plugin '{}' registered, total plugins loaded: {}",
1113        plugin_name,
1114        plugins.len()
1115    );
1116
1117    Ok(())
1118}
1119
1120/// Load all plugins from a directory
1121async fn load_plugins_from_dir_internal(
1122    runtime: Rc<RefCell<QuickJsBackend>>,
1123    plugins: &mut HashMap<String, TsPluginInfo>,
1124    dir: &Path,
1125) -> Vec<String> {
1126    tracing::debug!(
1127        "load_plugins_from_dir_internal: scanning directory {:?}",
1128        dir
1129    );
1130    let mut errors = Vec::new();
1131
1132    if !dir.exists() {
1133        tracing::warn!("Plugin directory does not exist: {:?}", dir);
1134        return errors;
1135    }
1136
1137    // Scan directory for .ts and .js files
1138    match std::fs::read_dir(dir) {
1139        Ok(entries) => {
1140            for entry in entries.flatten() {
1141                let path = entry.path();
1142                let ext = path.extension().and_then(|s| s.to_str());
1143                if ext == Some("ts") || ext == Some("js") {
1144                    tracing::debug!(
1145                        "load_plugins_from_dir_internal: attempting to load {:?}",
1146                        path
1147                    );
1148                    if let Err(e) = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await
1149                    {
1150                        let err = format!("Failed to load {:?}: {}", path, e);
1151                        tracing::error!("{}", err);
1152                        errors.push(err);
1153                    }
1154                }
1155            }
1156
1157            tracing::debug!(
1158                "load_plugins_from_dir_internal: finished loading from {:?}, {} errors",
1159                dir,
1160                errors.len()
1161            );
1162        }
1163        Err(e) => {
1164            let err = format!("Failed to read plugin directory: {}", e);
1165            tracing::error!("{}", err);
1166            errors.push(err);
1167        }
1168    }
1169
1170    errors
1171}
1172
1173/// Load all plugins from a directory with config support
1174/// Returns (errors, discovered_plugins) where discovered_plugins contains all
1175/// found plugin files with their configs (respecting enabled state from provided configs)
1176async fn load_plugins_from_dir_with_config_internal(
1177    runtime: Rc<RefCell<QuickJsBackend>>,
1178    plugins: &mut HashMap<String, TsPluginInfo>,
1179    dir: &Path,
1180    plugin_configs: &HashMap<String, PluginConfig>,
1181) -> (Vec<String>, HashMap<String, PluginConfig>) {
1182    tracing::debug!(
1183        "load_plugins_from_dir_with_config_internal: scanning directory {:?}",
1184        dir
1185    );
1186    let mut errors = Vec::new();
1187    let mut discovered_plugins: HashMap<String, PluginConfig> = HashMap::new();
1188
1189    if !dir.exists() {
1190        tracing::warn!("Plugin directory does not exist: {:?}", dir);
1191        return (errors, discovered_plugins);
1192    }
1193
1194    // First pass: scan directory and collect all plugin files
1195    let mut plugin_files: Vec<(String, std::path::PathBuf)> = Vec::new();
1196    match std::fs::read_dir(dir) {
1197        Ok(entries) => {
1198            for entry in entries.flatten() {
1199                let path = entry.path();
1200                let ext = path.extension().and_then(|s| s.to_str());
1201                if ext == Some("ts") || ext == Some("js") {
1202                    // Skip .i18n.json files (they're not plugins)
1203                    if path.to_string_lossy().contains(".i18n.") {
1204                        continue;
1205                    }
1206                    // Get plugin name from filename (without extension)
1207                    let plugin_name = path
1208                        .file_stem()
1209                        .and_then(|s| s.to_str())
1210                        .unwrap_or("unknown")
1211                        .to_string();
1212                    plugin_files.push((plugin_name, path));
1213                }
1214            }
1215        }
1216        Err(e) => {
1217            let err = format!("Failed to read plugin directory: {}", e);
1218            tracing::error!("{}", err);
1219            errors.push(err);
1220            return (errors, discovered_plugins);
1221        }
1222    }
1223
1224    // Second pass: build discovered_plugins map and load enabled plugins
1225    for (plugin_name, path) in plugin_files {
1226        // Check if we have an existing config for this plugin
1227        let config = if let Some(existing_config) = plugin_configs.get(&plugin_name) {
1228            // Use existing config but ensure path is set
1229            PluginConfig {
1230                enabled: existing_config.enabled,
1231                path: Some(path.clone()),
1232            }
1233        } else {
1234            // Create new config with default enabled = true
1235            PluginConfig::new_with_path(path.clone())
1236        };
1237
1238        // Add to discovered plugins
1239        discovered_plugins.insert(plugin_name.clone(), config.clone());
1240
1241        // Only load if enabled
1242        if config.enabled {
1243            tracing::debug!(
1244                "load_plugins_from_dir_with_config_internal: loading enabled plugin '{}'",
1245                plugin_name
1246            );
1247            if let Err(e) = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await {
1248                let err = format!("Failed to load {:?}: {}", path, e);
1249                tracing::error!("{}", err);
1250                errors.push(err);
1251            }
1252        } else {
1253            tracing::info!(
1254                "load_plugins_from_dir_with_config_internal: skipping disabled plugin '{}'",
1255                plugin_name
1256            );
1257        }
1258    }
1259
1260    tracing::debug!(
1261        "load_plugins_from_dir_with_config_internal: finished. Discovered {} plugins, {} errors",
1262        discovered_plugins.len(),
1263        errors.len()
1264    );
1265
1266    (errors, discovered_plugins)
1267}
1268
1269/// Unload a plugin
1270fn unload_plugin_internal(
1271    runtime: Rc<RefCell<QuickJsBackend>>,
1272    plugins: &mut HashMap<String, TsPluginInfo>,
1273    name: &str,
1274) -> Result<()> {
1275    if plugins.remove(name).is_some() {
1276        tracing::info!("Unloading TypeScript plugin: {}", name);
1277
1278        // Unregister i18n strings
1279        runtime
1280            .borrow_mut()
1281            .services
1282            .unregister_plugin_strings(name);
1283
1284        // Remove all commands registered by this plugin
1285        runtime
1286            .borrow()
1287            .services
1288            .unregister_commands_by_plugin(name);
1289
1290        Ok(())
1291    } else {
1292        Err(anyhow!("Plugin '{}' not found", name))
1293    }
1294}
1295
1296/// Reload a plugin
1297async fn reload_plugin_internal(
1298    runtime: Rc<RefCell<QuickJsBackend>>,
1299    plugins: &mut HashMap<String, TsPluginInfo>,
1300    name: &str,
1301) -> Result<()> {
1302    let path = plugins
1303        .get(name)
1304        .ok_or_else(|| anyhow!("Plugin '{}' not found", name))?
1305        .path
1306        .clone();
1307
1308    unload_plugin_internal(Rc::clone(&runtime), plugins, name)?;
1309    load_plugin_internal(runtime, plugins, &path).await?;
1310
1311    Ok(())
1312}
1313
1314#[cfg(test)]
1315mod tests {
1316    use super::*;
1317    use fresh_core::hooks::hook_args_to_json;
1318
1319    #[test]
1320    fn test_oneshot_channel() {
1321        let (tx, rx) = oneshot::channel::<i32>();
1322        assert!(tx.send(42).is_ok());
1323        assert_eq!(rx.recv().unwrap(), 42);
1324    }
1325
1326    #[test]
1327    fn test_hook_args_to_json_editor_initialized() {
1328        let args = HookArgs::EditorInitialized;
1329        let json = hook_args_to_json(&args).unwrap();
1330        assert_eq!(json, serde_json::json!({}));
1331    }
1332
1333    #[test]
1334    fn test_hook_args_to_json_prompt_changed() {
1335        let args = HookArgs::PromptChanged {
1336            prompt_type: "search".to_string(),
1337            input: "test".to_string(),
1338        };
1339        let json = hook_args_to_json(&args).unwrap();
1340        assert_eq!(json["prompt_type"], "search");
1341        assert_eq!(json["input"], "test");
1342    }
1343}