Skip to main content

fresh_plugin_runtime/
thread.rs

1//! Plugin Thread: Dedicated thread for TypeScript plugin execution
2//!
3//! This module implements a dedicated thread architecture for plugin execution,
4//! using QuickJS as the JavaScript runtime with oxc for TypeScript transpilation.
5//!
6//! Architecture:
7//! - Main thread (UI) sends requests to plugin thread via channel
8//! - Plugin thread owns QuickJS runtime and persistent tokio runtime
9//! - Results are sent back via the existing PluginCommand channel
10//! - Async operations complete naturally without runtime destruction
11
12use crate::backend::quickjs_backend::{PendingResponses, TsPluginInfo};
13use crate::backend::QuickJsBackend;
14use anyhow::{anyhow, Result};
15use fresh_core::api::{EditorStateSnapshot, PluginCommand};
16use fresh_core::hooks::HookArgs;
17use std::cell::RefCell;
18use std::collections::HashMap;
19use std::path::{Path, PathBuf};
20use std::rc::Rc;
21use std::sync::{Arc, RwLock};
22use std::thread::{self, JoinHandle};
23use std::time::Duration;
24
25// Re-export PluginConfig from fresh-core
26pub use fresh_core::config::PluginConfig;
27
28/// Request messages sent to the plugin thread
29#[derive(Debug)]
30pub enum PluginRequest {
31    /// Load a plugin from a file
32    LoadPlugin {
33        path: PathBuf,
34        response: oneshot::Sender<Result<()>>,
35    },
36
37    /// Resolve an async callback with a result (for async operations like SpawnProcess, Delay)
38    ResolveCallback {
39        callback_id: fresh_core::api::JsCallbackId,
40        result_json: String,
41    },
42
43    /// Reject an async callback with an error
44    RejectCallback {
45        callback_id: fresh_core::api::JsCallbackId,
46        error: String,
47    },
48
49    /// Load all plugins from a directory
50    LoadPluginsFromDir {
51        dir: PathBuf,
52        response: oneshot::Sender<Vec<String>>,
53    },
54
55    /// Load all plugins from a directory with config support
56    /// Returns (errors, discovered_plugins) where discovered_plugins contains
57    /// all found plugins with their paths and enabled status
58    LoadPluginsFromDirWithConfig {
59        dir: PathBuf,
60        plugin_configs: HashMap<String, PluginConfig>,
61        response: oneshot::Sender<(Vec<String>, HashMap<String, PluginConfig>)>,
62    },
63
64    /// Unload a plugin by name
65    UnloadPlugin {
66        name: String,
67        response: oneshot::Sender<Result<()>>,
68    },
69
70    /// Reload a plugin by name
71    ReloadPlugin {
72        name: String,
73        response: oneshot::Sender<Result<()>>,
74    },
75
76    /// Execute a plugin action
77    ExecuteAction {
78        action_name: String,
79        response: oneshot::Sender<Result<()>>,
80    },
81
82    /// Run a hook (fire-and-forget, no response needed)
83    RunHook { hook_name: String, args: HookArgs },
84
85    /// Check if any handlers are registered for a hook
86    HasHookHandlers {
87        hook_name: String,
88        response: oneshot::Sender<bool>,
89    },
90
91    /// List all loaded plugins
92    ListPlugins {
93        response: oneshot::Sender<Vec<TsPluginInfo>>,
94    },
95
96    /// Shutdown the plugin thread
97    Shutdown,
98}
99
100/// Simple oneshot channel implementation
101pub mod oneshot {
102    use std::fmt;
103    use std::sync::mpsc;
104
105    pub struct Sender<T>(mpsc::SyncSender<T>);
106    pub struct Receiver<T>(mpsc::Receiver<T>);
107
108    use anyhow::Result;
109
110    impl<T> fmt::Debug for Sender<T> {
111        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112            f.debug_tuple("Sender").finish()
113        }
114    }
115
116    impl<T> fmt::Debug for Receiver<T> {
117        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118            f.debug_tuple("Receiver").finish()
119        }
120    }
121
122    impl<T> Sender<T> {
123        pub fn send(self, value: T) -> Result<(), T> {
124            self.0.send(value).map_err(|e| e.0)
125        }
126    }
127
128    impl<T> Receiver<T> {
129        pub fn recv(self) -> Result<T, mpsc::RecvError> {
130            self.0.recv()
131        }
132
133        pub fn recv_timeout(
134            self,
135            timeout: std::time::Duration,
136        ) -> Result<T, mpsc::RecvTimeoutError> {
137            self.0.recv_timeout(timeout)
138        }
139
140        pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
141            self.0.try_recv()
142        }
143    }
144
145    pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
146        let (tx, rx) = mpsc::sync_channel(1);
147        (Sender(tx), Receiver(rx))
148    }
149}
150
151/// Handle to the plugin thread for sending requests
152pub struct PluginThreadHandle {
153    /// Channel to send requests to the plugin thread
154    /// Wrapped in Option so we can drop it to signal shutdown
155    request_sender: Option<tokio::sync::mpsc::UnboundedSender<PluginRequest>>,
156
157    /// Thread join handle
158    thread_handle: Option<JoinHandle<()>>,
159
160    /// State snapshot handle for editor to update
161    state_snapshot: Arc<RwLock<EditorStateSnapshot>>,
162
163    /// Pending response senders for async operations (shared with runtime)
164    pending_responses: PendingResponses,
165
166    /// Receiver for plugin commands (polled by editor directly)
167    command_receiver: std::sync::mpsc::Receiver<PluginCommand>,
168}
169
170impl PluginThreadHandle {
171    /// Create a new plugin thread and return its handle
172    pub fn spawn(services: Arc<dyn fresh_core::services::PluginServiceBridge>) -> Result<Self> {
173        tracing::debug!("PluginThreadHandle::spawn: starting plugin thread creation");
174
175        // Create channel for plugin commands
176        let (command_sender, command_receiver) = std::sync::mpsc::channel();
177
178        // Create editor state snapshot for query API
179        let state_snapshot = Arc::new(RwLock::new(EditorStateSnapshot::new()));
180
181        // Create pending responses map (shared between handle and runtime)
182        let pending_responses: PendingResponses =
183            Arc::new(std::sync::Mutex::new(std::collections::HashMap::new()));
184        let thread_pending_responses = Arc::clone(&pending_responses);
185
186        // Create channel for requests (unbounded allows sync send, async recv)
187        let (request_sender, request_receiver) = tokio::sync::mpsc::unbounded_channel();
188
189        // Clone state snapshot for the thread
190        let thread_state_snapshot = Arc::clone(&state_snapshot);
191
192        // Spawn the plugin thread
193        tracing::debug!("PluginThreadHandle::spawn: spawning OS thread for plugin runtime");
194        let thread_handle = thread::spawn(move || {
195            tracing::debug!("Plugin thread: OS thread started, creating tokio runtime");
196            // Create tokio runtime for the plugin thread
197            let rt = match tokio::runtime::Builder::new_current_thread()
198                .enable_all()
199                .build()
200            {
201                Ok(rt) => {
202                    tracing::debug!("Plugin thread: tokio runtime created successfully");
203                    rt
204                }
205                Err(e) => {
206                    tracing::error!("Failed to create plugin thread runtime: {}", e);
207                    return;
208                }
209            };
210
211            // Create QuickJS runtime with state
212            tracing::debug!("Plugin thread: creating QuickJS runtime");
213            let runtime = match QuickJsBackend::with_state_and_responses(
214                Arc::clone(&thread_state_snapshot),
215                command_sender,
216                thread_pending_responses,
217                services.clone(),
218            ) {
219                Ok(rt) => {
220                    tracing::debug!("Plugin thread: QuickJS runtime created successfully");
221                    rt
222                }
223                Err(e) => {
224                    tracing::error!("Failed to create QuickJS runtime: {}", e);
225                    return;
226                }
227            };
228
229            // Create internal manager state
230            let mut plugins: HashMap<String, TsPluginInfo> = HashMap::new();
231
232            // Run the event loop with a LocalSet to allow concurrent task execution
233            tracing::debug!("Plugin thread: starting event loop with LocalSet");
234            let local = tokio::task::LocalSet::new();
235            local.block_on(&rt, async {
236                // Wrap runtime in RefCell for interior mutability during concurrent operations
237                let runtime = Rc::new(RefCell::new(runtime));
238                tracing::debug!("Plugin thread: entering plugin_thread_loop");
239                plugin_thread_loop(runtime, &mut plugins, request_receiver).await;
240            });
241
242            tracing::info!("Plugin thread shutting down");
243        });
244
245        tracing::debug!("PluginThreadHandle::spawn: OS thread spawned, returning handle");
246        tracing::info!("Plugin thread spawned");
247
248        Ok(Self {
249            request_sender: Some(request_sender),
250            thread_handle: Some(thread_handle),
251            state_snapshot,
252            pending_responses,
253            command_receiver,
254        })
255    }
256
257    /// Check if the plugin thread is still alive
258    pub fn is_alive(&self) -> bool {
259        self.thread_handle
260            .as_ref()
261            .map(|h| !h.is_finished())
262            .unwrap_or(false)
263    }
264
265    /// Check thread health and panic if the plugin thread died due to a panic.
266    /// This propagates plugin thread panics to the calling thread.
267    /// Call this periodically to detect plugin thread failures.
268    pub fn check_thread_health(&mut self) {
269        if let Some(handle) = &self.thread_handle {
270            if handle.is_finished() {
271                tracing::error!(
272                    "check_thread_health: plugin thread is finished, checking for panic"
273                );
274                // Thread finished - take ownership and check result
275                if let Some(handle) = self.thread_handle.take() {
276                    match handle.join() {
277                        Ok(()) => {
278                            tracing::warn!("Plugin thread exited normally (unexpected)");
279                        }
280                        Err(panic_payload) => {
281                            // Re-panic with the original panic message to propagate it
282                            std::panic::resume_unwind(panic_payload);
283                        }
284                    }
285                }
286            }
287        }
288    }
289
290    /// Deliver a response to a pending async operation in the plugin
291    ///
292    /// This is called by the editor after processing a command that requires a response.
293    pub fn deliver_response(&self, response: fresh_core::api::PluginResponse) {
294        // First try to find a pending Rust request (oneshot channel)
295        if respond_to_pending(&self.pending_responses, response.clone()) {
296            return;
297        }
298
299        // If not found, it must be a JS callback
300        use fresh_core::api::{JsCallbackId, PluginResponse};
301
302        match response {
303            PluginResponse::VirtualBufferCreated {
304                request_id,
305                buffer_id,
306                split_id,
307            } => {
308                // Return an object with bufferId and splitId (camelCase for JS)
309                let result = serde_json::json!({
310                    "bufferId": buffer_id.0,
311                    "splitId": split_id.map(|s| s.0)
312                });
313                self.resolve_callback(JsCallbackId(request_id), result.to_string());
314            }
315            PluginResponse::LspRequest { request_id, result } => match result {
316                Ok(value) => {
317                    self.resolve_callback(JsCallbackId(request_id), value.to_string());
318                }
319                Err(e) => {
320                    self.reject_callback(JsCallbackId(request_id), e);
321                }
322            },
323            PluginResponse::HighlightsComputed { request_id, spans } => {
324                let result = serde_json::to_string(&spans).unwrap_or_else(|_| "[]".to_string());
325                self.resolve_callback(JsCallbackId(request_id), result);
326            }
327            PluginResponse::BufferText { request_id, text } => match text {
328                Ok(content) => {
329                    // JSON stringify the content string
330                    let result =
331                        serde_json::to_string(&content).unwrap_or_else(|_| "\"\"".to_string());
332                    self.resolve_callback(JsCallbackId(request_id), result);
333                }
334                Err(e) => {
335                    self.reject_callback(JsCallbackId(request_id), e);
336                }
337            },
338            PluginResponse::CompositeBufferCreated {
339                request_id,
340                buffer_id,
341            } => {
342                // Return just the buffer_id number, not an object
343                self.resolve_callback(JsCallbackId(request_id), buffer_id.0.to_string());
344            }
345            PluginResponse::LineStartPosition {
346                request_id,
347                position,
348            } => {
349                // Return the position as a number or null
350                let result =
351                    serde_json::to_string(&position).unwrap_or_else(|_| "null".to_string());
352                self.resolve_callback(JsCallbackId(request_id), result);
353            }
354            PluginResponse::LineEndPosition {
355                request_id,
356                position,
357            } => {
358                // Return the position as a number or null
359                let result =
360                    serde_json::to_string(&position).unwrap_or_else(|_| "null".to_string());
361                self.resolve_callback(JsCallbackId(request_id), result);
362            }
363            PluginResponse::BufferLineCount { request_id, count } => {
364                // Return the count as a number or null
365                let result = serde_json::to_string(&count).unwrap_or_else(|_| "null".to_string());
366                self.resolve_callback(JsCallbackId(request_id), result);
367            }
368            PluginResponse::TerminalCreated {
369                request_id,
370                buffer_id,
371                terminal_id,
372                split_id,
373            } => {
374                let result = serde_json::json!({
375                    "bufferId": buffer_id.0,
376                    "terminalId": terminal_id.0,
377                    "splitId": split_id.map(|s| s.0)
378                });
379                self.resolve_callback(JsCallbackId(request_id), result.to_string());
380            }
381            PluginResponse::SplitByLabel {
382                request_id,
383                split_id,
384            } => {
385                let result = serde_json::to_string(&split_id.map(|s| s.0))
386                    .unwrap_or_else(|_| "null".to_string());
387                self.resolve_callback(JsCallbackId(request_id), result);
388            }
389        }
390    }
391
392    /// Load a plugin from a file (blocking)
393    pub fn load_plugin(&self, path: &Path) -> Result<()> {
394        let (tx, rx) = oneshot::channel();
395        self.request_sender
396            .as_ref()
397            .ok_or_else(|| anyhow!("Plugin thread shut down"))?
398            .send(PluginRequest::LoadPlugin {
399                path: path.to_path_buf(),
400                response: tx,
401            })
402            .map_err(|_| anyhow!("Plugin thread not responding"))?;
403
404        rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
405    }
406
407    /// Load all plugins from a directory (blocking)
408    pub fn load_plugins_from_dir(&self, dir: &Path) -> Vec<String> {
409        let (tx, rx) = oneshot::channel();
410        let Some(sender) = self.request_sender.as_ref() else {
411            return vec!["Plugin thread shut down".to_string()];
412        };
413        if sender
414            .send(PluginRequest::LoadPluginsFromDir {
415                dir: dir.to_path_buf(),
416                response: tx,
417            })
418            .is_err()
419        {
420            return vec!["Plugin thread not responding".to_string()];
421        }
422
423        rx.recv()
424            .unwrap_or_else(|_| vec!["Plugin thread closed".to_string()])
425    }
426
427    /// Load all plugins from a directory with config support (blocking)
428    /// Returns (errors, discovered_plugins) where discovered_plugins is a map of
429    /// plugin name -> PluginConfig with paths populated.
430    pub fn load_plugins_from_dir_with_config(
431        &self,
432        dir: &Path,
433        plugin_configs: &HashMap<String, PluginConfig>,
434    ) -> (Vec<String>, HashMap<String, PluginConfig>) {
435        let (tx, rx) = oneshot::channel();
436        let Some(sender) = self.request_sender.as_ref() else {
437            return (vec!["Plugin thread shut down".to_string()], HashMap::new());
438        };
439        if sender
440            .send(PluginRequest::LoadPluginsFromDirWithConfig {
441                dir: dir.to_path_buf(),
442                plugin_configs: plugin_configs.clone(),
443                response: tx,
444            })
445            .is_err()
446        {
447            return (
448                vec!["Plugin thread not responding".to_string()],
449                HashMap::new(),
450            );
451        }
452
453        rx.recv()
454            .unwrap_or_else(|_| (vec!["Plugin thread closed".to_string()], HashMap::new()))
455    }
456
457    /// Unload a plugin (blocking)
458    pub fn unload_plugin(&self, name: &str) -> Result<()> {
459        let (tx, rx) = oneshot::channel();
460        self.request_sender
461            .as_ref()
462            .ok_or_else(|| anyhow!("Plugin thread shut down"))?
463            .send(PluginRequest::UnloadPlugin {
464                name: name.to_string(),
465                response: tx,
466            })
467            .map_err(|_| anyhow!("Plugin thread not responding"))?;
468
469        rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
470    }
471
472    /// Reload a plugin (blocking)
473    pub fn reload_plugin(&self, name: &str) -> Result<()> {
474        let (tx, rx) = oneshot::channel();
475        self.request_sender
476            .as_ref()
477            .ok_or_else(|| anyhow!("Plugin thread shut down"))?
478            .send(PluginRequest::ReloadPlugin {
479                name: name.to_string(),
480                response: tx,
481            })
482            .map_err(|_| anyhow!("Plugin thread not responding"))?;
483
484        rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
485    }
486
487    /// Execute a plugin action (non-blocking)
488    ///
489    /// Returns a receiver that will receive the result when the action completes.
490    /// The caller should poll this while processing commands to avoid deadlock.
491    pub fn execute_action_async(&self, action_name: &str) -> Result<oneshot::Receiver<Result<()>>> {
492        tracing::trace!("execute_action_async: starting action '{}'", action_name);
493        let (tx, rx) = oneshot::channel();
494        self.request_sender
495            .as_ref()
496            .ok_or_else(|| anyhow!("Plugin thread shut down"))?
497            .send(PluginRequest::ExecuteAction {
498                action_name: action_name.to_string(),
499                response: tx,
500            })
501            .map_err(|_| anyhow!("Plugin thread not responding"))?;
502
503        tracing::trace!("execute_action_async: request sent for '{}'", action_name);
504        Ok(rx)
505    }
506
507    /// Run a hook (non-blocking, fire-and-forget)
508    ///
509    /// This is the key improvement: hooks are now non-blocking.
510    /// The plugin thread will execute them asynchronously and
511    /// any results will come back via the PluginCommand channel.
512    pub fn run_hook(&self, hook_name: &str, args: HookArgs) {
513        if let Some(sender) = self.request_sender.as_ref() {
514            let _ = sender.send(PluginRequest::RunHook {
515                hook_name: hook_name.to_string(),
516                args,
517            });
518        }
519    }
520
521    /// Check if any handlers are registered for a hook (blocking)
522    pub fn has_hook_handlers(&self, hook_name: &str) -> bool {
523        let (tx, rx) = oneshot::channel();
524        let Some(sender) = self.request_sender.as_ref() else {
525            return false;
526        };
527        if sender
528            .send(PluginRequest::HasHookHandlers {
529                hook_name: hook_name.to_string(),
530                response: tx,
531            })
532            .is_err()
533        {
534            return false;
535        }
536
537        rx.recv().unwrap_or(false)
538    }
539
540    /// List all loaded plugins (blocking)
541    pub fn list_plugins(&self) -> Vec<TsPluginInfo> {
542        let (tx, rx) = oneshot::channel();
543        let Some(sender) = self.request_sender.as_ref() else {
544            return vec![];
545        };
546        if sender
547            .send(PluginRequest::ListPlugins { response: tx })
548            .is_err()
549        {
550            return vec![];
551        }
552
553        rx.recv().unwrap_or_default()
554    }
555
556    /// Process pending plugin commands (non-blocking)
557    ///
558    /// Returns immediately with any pending commands by polling the command queue directly.
559    /// This does not require the plugin thread to respond, avoiding deadlocks.
560    pub fn process_commands(&mut self) -> Vec<PluginCommand> {
561        let mut commands = Vec::new();
562        while let Ok(cmd) = self.command_receiver.try_recv() {
563            commands.push(cmd);
564        }
565        commands
566    }
567
568    /// Process commands, blocking until `HookCompleted` for the given hook arrives.
569    ///
570    /// After the render loop fires a hook like `lines_changed`, the plugin thread
571    /// processes it and sends back commands (AddConceal, etc.) followed by a
572    /// `HookCompleted` sentinel. This method waits for that sentinel so the
573    /// render has all conceal/overlay updates before painting the frame.
574    ///
575    /// Returns all non-sentinel commands collected while waiting.
576    /// Falls back to non-blocking drain if the timeout expires.
577    pub fn process_commands_until_hook_completed(
578        &mut self,
579        hook_name: &str,
580        timeout: std::time::Duration,
581    ) -> Vec<PluginCommand> {
582        let mut commands = Vec::new();
583        let deadline = std::time::Instant::now() + timeout;
584
585        loop {
586            let remaining = deadline.saturating_duration_since(std::time::Instant::now());
587            if remaining.is_zero() {
588                // Timeout: drain whatever is available
589                while let Ok(cmd) = self.command_receiver.try_recv() {
590                    if !matches!(&cmd, PluginCommand::HookCompleted { .. }) {
591                        commands.push(cmd);
592                    }
593                }
594                break;
595            }
596
597            match self.command_receiver.recv_timeout(remaining) {
598                Ok(PluginCommand::HookCompleted {
599                    hook_name: ref name,
600                }) if name == hook_name => {
601                    // Got our sentinel — drain any remaining commands
602                    while let Ok(cmd) = self.command_receiver.try_recv() {
603                        if !matches!(&cmd, PluginCommand::HookCompleted { .. }) {
604                            commands.push(cmd);
605                        }
606                    }
607                    break;
608                }
609                Ok(PluginCommand::HookCompleted { .. }) => {
610                    // Sentinel for a different hook, keep waiting
611                    continue;
612                }
613                Ok(cmd) => {
614                    commands.push(cmd);
615                }
616                Err(_) => {
617                    // Timeout or disconnected
618                    break;
619                }
620            }
621        }
622
623        commands
624    }
625
626    /// Get the state snapshot handle for editor to update
627    pub fn state_snapshot_handle(&self) -> Arc<RwLock<EditorStateSnapshot>> {
628        Arc::clone(&self.state_snapshot)
629    }
630
631    /// Shutdown the plugin thread
632    pub fn shutdown(&mut self) {
633        tracing::debug!("PluginThreadHandle::shutdown: starting shutdown");
634
635        // Drop all pending response senders - this wakes up any plugin code waiting for responses
636        // by causing their oneshot receivers to return an error
637        if let Ok(mut pending) = self.pending_responses.lock() {
638            if !pending.is_empty() {
639                tracing::warn!(
640                    "PluginThreadHandle::shutdown: dropping {} pending responses: {:?}",
641                    pending.len(),
642                    pending.keys().collect::<Vec<_>>()
643                );
644                pending.clear(); // Drop all senders, waking up waiting receivers
645            }
646        }
647
648        // First send a Shutdown request to allow clean processing of pending work
649        if let Some(sender) = self.request_sender.as_ref() {
650            tracing::debug!("PluginThreadHandle::shutdown: sending Shutdown request");
651            let _ = sender.send(PluginRequest::Shutdown);
652        }
653
654        // Then drop the sender to close the channel - this reliably wakes the receiver
655        // even when it's parked in a tokio LocalSet (the Shutdown message above may not wake it)
656        tracing::debug!("PluginThreadHandle::shutdown: dropping request_sender to close channel");
657        self.request_sender.take();
658
659        if let Some(handle) = self.thread_handle.take() {
660            tracing::debug!("PluginThreadHandle::shutdown: joining plugin thread");
661            let _ = handle.join();
662            tracing::debug!("PluginThreadHandle::shutdown: plugin thread joined");
663        }
664
665        tracing::debug!("PluginThreadHandle::shutdown: shutdown complete");
666    }
667
668    /// Resolve an async callback in the plugin runtime
669    /// Called by the app when async operations (SpawnProcess, Delay) complete
670    pub fn resolve_callback(
671        &self,
672        callback_id: fresh_core::api::JsCallbackId,
673        result_json: String,
674    ) {
675        if let Some(sender) = self.request_sender.as_ref() {
676            let _ = sender.send(PluginRequest::ResolveCallback {
677                callback_id,
678                result_json,
679            });
680        }
681    }
682
683    /// Reject an async callback in the plugin runtime
684    /// Called by the app when async operations fail
685    pub fn reject_callback(&self, callback_id: fresh_core::api::JsCallbackId, error: String) {
686        if let Some(sender) = self.request_sender.as_ref() {
687            let _ = sender.send(PluginRequest::RejectCallback { callback_id, error });
688        }
689    }
690}
691
692impl Drop for PluginThreadHandle {
693    fn drop(&mut self) {
694        self.shutdown();
695    }
696}
697
698fn respond_to_pending(
699    pending_responses: &PendingResponses,
700    response: fresh_core::api::PluginResponse,
701) -> bool {
702    let request_id = match &response {
703        fresh_core::api::PluginResponse::VirtualBufferCreated { request_id, .. } => *request_id,
704        fresh_core::api::PluginResponse::LspRequest { request_id, .. } => *request_id,
705        fresh_core::api::PluginResponse::HighlightsComputed { request_id, .. } => *request_id,
706        fresh_core::api::PluginResponse::BufferText { request_id, .. } => *request_id,
707        fresh_core::api::PluginResponse::CompositeBufferCreated { request_id, .. } => *request_id,
708        fresh_core::api::PluginResponse::LineStartPosition { request_id, .. } => *request_id,
709        fresh_core::api::PluginResponse::LineEndPosition { request_id, .. } => *request_id,
710        fresh_core::api::PluginResponse::BufferLineCount { request_id, .. } => *request_id,
711        fresh_core::api::PluginResponse::TerminalCreated { request_id, .. } => *request_id,
712        fresh_core::api::PluginResponse::SplitByLabel { request_id, .. } => *request_id,
713    };
714
715    let sender = {
716        let mut pending = pending_responses.lock().unwrap();
717        pending.remove(&request_id)
718    };
719
720    if let Some(tx) = sender {
721        let _ = tx.send(response);
722        true
723    } else {
724        false
725    }
726}
727
728#[cfg(test)]
729mod plugin_thread_tests {
730    use super::*;
731    use fresh_core::api::PluginResponse;
732    use serde_json::json;
733    use std::collections::HashMap;
734    use std::sync::{Arc, Mutex};
735    use tokio::sync::oneshot;
736
737    #[test]
738    fn respond_to_pending_sends_lsp_response() {
739        let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
740        let (tx, mut rx) = oneshot::channel();
741        pending.lock().unwrap().insert(123, tx);
742
743        respond_to_pending(
744            &pending,
745            PluginResponse::LspRequest {
746                request_id: 123,
747                result: Ok(json!({ "key": "value" })),
748            },
749        );
750
751        let response = rx.try_recv().expect("expected response");
752        match response {
753            PluginResponse::LspRequest { result, .. } => {
754                assert_eq!(result.unwrap(), json!({ "key": "value" }));
755            }
756            _ => panic!("unexpected variant"),
757        }
758
759        assert!(pending.lock().unwrap().is_empty());
760    }
761
762    #[test]
763    fn respond_to_pending_handles_virtual_buffer_created() {
764        let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
765        let (tx, mut rx) = oneshot::channel();
766        pending.lock().unwrap().insert(456, tx);
767
768        respond_to_pending(
769            &pending,
770            PluginResponse::VirtualBufferCreated {
771                request_id: 456,
772                buffer_id: fresh_core::BufferId(7),
773                split_id: Some(fresh_core::SplitId(1)),
774            },
775        );
776
777        let response = rx.try_recv().expect("expected response");
778        match response {
779            PluginResponse::VirtualBufferCreated { buffer_id, .. } => {
780                assert_eq!(buffer_id.0, 7);
781            }
782            _ => panic!("unexpected variant"),
783        }
784
785        assert!(pending.lock().unwrap().is_empty());
786    }
787}
788
789/// Main loop for the plugin thread
790///
791/// Uses `tokio::select!` to interleave request handling with periodic event loop
792/// polling. This allows long-running promises (like process spawns) to make progress
793/// even when no requests are coming in, preventing the UI from getting stuck.
794async fn plugin_thread_loop(
795    runtime: Rc<RefCell<QuickJsBackend>>,
796    plugins: &mut HashMap<String, TsPluginInfo>,
797    mut request_receiver: tokio::sync::mpsc::UnboundedReceiver<PluginRequest>,
798) {
799    tracing::info!("Plugin thread event loop started");
800
801    // Interval for polling the JS event loop when there's pending work
802    let poll_interval = Duration::from_millis(1);
803    let mut has_pending_work = false;
804
805    loop {
806        // Check for fatal JS errors (e.g., unhandled promise rejections in test mode)
807        // These are set via set_fatal_js_error() because panicking inside FFI callbacks
808        // is caught by rquickjs and doesn't terminate the thread.
809        if crate::backend::has_fatal_js_error() {
810            if let Some(error_msg) = crate::backend::take_fatal_js_error() {
811                tracing::error!(
812                    "Fatal JS error detected, terminating plugin thread: {}",
813                    error_msg
814                );
815                panic!("Fatal plugin error: {}", error_msg);
816            }
817        }
818
819        tokio::select! {
820            biased; // Prefer handling requests over polling
821
822            request = request_receiver.recv() => {
823                match request {
824                    Some(PluginRequest::ExecuteAction {
825                        action_name,
826                        response,
827                    }) => {
828                        // Start the action without blocking - this allows us to process
829                        // ResolveCallback requests that the action may be waiting for.
830                        let result = runtime.borrow_mut().start_action(&action_name);
831                        let _ = response.send(result);
832                        has_pending_work = true; // Action may have started async work
833                    }
834                    Some(request) => {
835                        let should_shutdown =
836                            handle_request(request, Rc::clone(&runtime), plugins).await;
837
838                        if should_shutdown {
839                            break;
840                        }
841                        has_pending_work = true; // Request may have started async work
842                    }
843                    None => {
844                        // Channel closed
845                        tracing::info!("Plugin thread request channel closed");
846                        break;
847                    }
848                }
849            }
850
851            // Poll the JS event loop periodically to make progress on pending promises
852            _ = tokio::time::sleep(poll_interval), if has_pending_work => {
853                has_pending_work = runtime.borrow_mut().poll_event_loop_once();
854            }
855        }
856    }
857}
858
859/// Execute an action while processing incoming hook requests concurrently.
860///
861/// This prevents deadlock when an action awaits a response from the main thread
862/// while the main thread is waiting for a blocking hook to complete.
863///
864/// # Safety (clippy::await_holding_refcell_ref)
865/// The RefCell borrow held across await is safe because:
866/// - This runs on a single-threaded tokio runtime (no parallel task execution)
867/// - No spawn_local calls exist that could create concurrent access to `runtime`
868/// - The runtime Rc<RefCell<>> is never shared with other concurrent tasks
869
870/// Run a hook with Rc<RefCell<QuickJsBackend>>
871///
872/// # Safety (clippy::await_holding_refcell_ref)
873/// The RefCell borrow held across await is safe because:
874/// - This runs on a single-threaded tokio runtime (no parallel task execution)
875/// - No spawn_local calls exist that could create concurrent access to `runtime`
876/// - The runtime Rc<RefCell<>> is never shared with other concurrent tasks
877#[allow(clippy::await_holding_refcell_ref)]
878async fn run_hook_internal_rc(
879    runtime: Rc<RefCell<QuickJsBackend>>,
880    hook_name: &str,
881    args: &HookArgs,
882) -> Result<()> {
883    // Convert HookArgs to JSON using hook_args_to_json which produces flat JSON
884    // (not enum-tagged JSON from serde's default Serialize)
885    let json_start = std::time::Instant::now();
886    let json_string = fresh_core::hooks::hook_args_to_json(args)?;
887    let json_data: serde_json::Value = serde_json::from_str(&json_string)?;
888    tracing::trace!(
889        hook = hook_name,
890        json_ms = json_start.elapsed().as_micros(),
891        "hook args serialized"
892    );
893
894    // Emit to TypeScript handlers
895    let emit_start = std::time::Instant::now();
896    runtime.borrow_mut().emit(hook_name, &json_data).await?;
897    tracing::trace!(
898        hook = hook_name,
899        emit_ms = emit_start.elapsed().as_millis(),
900        "emit completed"
901    );
902
903    Ok(())
904}
905
906/// Handle a single request in the plugin thread
907async fn handle_request(
908    request: PluginRequest,
909    runtime: Rc<RefCell<QuickJsBackend>>,
910    plugins: &mut HashMap<String, TsPluginInfo>,
911) -> bool {
912    match request {
913        PluginRequest::LoadPlugin { path, response } => {
914            let result = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await;
915            let _ = response.send(result);
916        }
917
918        PluginRequest::LoadPluginsFromDir { dir, response } => {
919            let errors = load_plugins_from_dir_internal(Rc::clone(&runtime), plugins, &dir).await;
920            let _ = response.send(errors);
921        }
922
923        PluginRequest::LoadPluginsFromDirWithConfig {
924            dir,
925            plugin_configs,
926            response,
927        } => {
928            let (errors, discovered) = load_plugins_from_dir_with_config_internal(
929                Rc::clone(&runtime),
930                plugins,
931                &dir,
932                &plugin_configs,
933            )
934            .await;
935            let _ = response.send((errors, discovered));
936        }
937
938        PluginRequest::UnloadPlugin { name, response } => {
939            let result = unload_plugin_internal(Rc::clone(&runtime), plugins, &name);
940            let _ = response.send(result);
941        }
942
943        PluginRequest::ReloadPlugin { name, response } => {
944            let result = reload_plugin_internal(Rc::clone(&runtime), plugins, &name).await;
945            let _ = response.send(result);
946        }
947
948        PluginRequest::ExecuteAction {
949            action_name,
950            response,
951        } => {
952            // This is handled in plugin_thread_loop with select! for concurrent processing
953            // If we get here, it's an unexpected state
954            tracing::error!(
955                "ExecuteAction should be handled in main loop, not here: {}",
956                action_name
957            );
958            let _ = response.send(Err(anyhow::anyhow!(
959                "Internal error: ExecuteAction in wrong handler"
960            )));
961        }
962
963        PluginRequest::RunHook { hook_name, args } => {
964            // Fire-and-forget hook execution
965            let hook_start = std::time::Instant::now();
966            // Use info level for prompt hooks to aid debugging
967            if hook_name == "prompt_confirmed" || hook_name == "prompt_cancelled" {
968                tracing::info!(hook = %hook_name, ?args, "RunHook request received (prompt hook)");
969            } else {
970                tracing::trace!(hook = %hook_name, "RunHook request received");
971            }
972            if let Err(e) = run_hook_internal_rc(Rc::clone(&runtime), &hook_name, &args).await {
973                let error_msg = format!("Plugin error in '{}': {}", hook_name, e);
974                tracing::error!("{}", error_msg);
975                // Surface the error to the UI
976                runtime.borrow_mut().send_status(error_msg);
977            }
978            // Send sentinel so the main thread can wait deterministically
979            // for all commands from this hook to be available.
980            runtime.borrow().send_hook_completed(hook_name.clone());
981            if hook_name == "prompt_confirmed" || hook_name == "prompt_cancelled" {
982                tracing::info!(
983                    hook = %hook_name,
984                    elapsed_ms = hook_start.elapsed().as_millis(),
985                    "RunHook completed (prompt hook)"
986                );
987            } else {
988                tracing::trace!(
989                    hook = %hook_name,
990                    elapsed_ms = hook_start.elapsed().as_millis(),
991                    "RunHook completed"
992                );
993            }
994        }
995
996        PluginRequest::HasHookHandlers {
997            hook_name,
998            response,
999        } => {
1000            let has_handlers = runtime.borrow().has_handlers(&hook_name);
1001            let _ = response.send(has_handlers);
1002        }
1003
1004        PluginRequest::ListPlugins { response } => {
1005            let plugin_list: Vec<TsPluginInfo> = plugins.values().cloned().collect();
1006            let _ = response.send(plugin_list);
1007        }
1008
1009        PluginRequest::ResolveCallback {
1010            callback_id,
1011            result_json,
1012        } => {
1013            tracing::info!(
1014                "ResolveCallback: resolving callback_id={} with result_json={}",
1015                callback_id,
1016                result_json
1017            );
1018            runtime
1019                .borrow_mut()
1020                .resolve_callback(callback_id, &result_json);
1021            // resolve_callback now runs execute_pending_job() internally
1022            tracing::info!(
1023                "ResolveCallback: done resolving callback_id={}",
1024                callback_id
1025            );
1026        }
1027
1028        PluginRequest::RejectCallback { callback_id, error } => {
1029            runtime.borrow_mut().reject_callback(callback_id, &error);
1030            // reject_callback now runs execute_pending_job() internally
1031        }
1032
1033        PluginRequest::Shutdown => {
1034            tracing::info!("Plugin thread received shutdown request");
1035            return true;
1036        }
1037    }
1038
1039    false
1040}
1041
1042/// Load a plugin from a file
1043///
1044/// # Safety (clippy::await_holding_refcell_ref)
1045/// The RefCell borrow held across await is safe because:
1046/// - This runs on a single-threaded tokio runtime (no parallel task execution)
1047/// - No spawn_local calls exist that could create concurrent access to `runtime`
1048/// - The runtime Rc<RefCell<>> is never shared with other concurrent tasks
1049#[allow(clippy::await_holding_refcell_ref)]
1050async fn load_plugin_internal(
1051    runtime: Rc<RefCell<QuickJsBackend>>,
1052    plugins: &mut HashMap<String, TsPluginInfo>,
1053    path: &Path,
1054) -> Result<()> {
1055    let plugin_name = path
1056        .file_stem()
1057        .and_then(|s| s.to_str())
1058        .ok_or_else(|| anyhow!("Invalid plugin filename"))?
1059        .to_string();
1060
1061    tracing::info!("Loading TypeScript plugin: {} from {:?}", plugin_name, path);
1062    tracing::debug!(
1063        "load_plugin_internal: starting module load for plugin '{}'",
1064        plugin_name
1065    );
1066
1067    // Load and execute the module, passing plugin name for command registration
1068    let path_str = path
1069        .to_str()
1070        .ok_or_else(|| anyhow!("Invalid path encoding"))?;
1071
1072    // Try to load accompanying .i18n.json file
1073    let i18n_path = path.with_extension("i18n.json");
1074    if i18n_path.exists() {
1075        if let Ok(content) = std::fs::read_to_string(&i18n_path) {
1076            if let Ok(strings) = serde_json::from_str::<
1077                std::collections::HashMap<String, std::collections::HashMap<String, String>>,
1078            >(&content)
1079            {
1080                runtime
1081                    .borrow_mut()
1082                    .services
1083                    .register_plugin_strings(&plugin_name, strings);
1084                tracing::debug!("Loaded i18n strings for plugin '{}'", plugin_name);
1085            }
1086        }
1087    }
1088
1089    let load_start = std::time::Instant::now();
1090    runtime
1091        .borrow_mut()
1092        .load_module_with_source(path_str, &plugin_name)
1093        .await?;
1094    let load_elapsed = load_start.elapsed();
1095
1096    tracing::debug!(
1097        "load_plugin_internal: plugin '{}' loaded successfully in {:?}",
1098        plugin_name,
1099        load_elapsed
1100    );
1101
1102    // Store plugin info
1103    plugins.insert(
1104        plugin_name.clone(),
1105        TsPluginInfo {
1106            name: plugin_name.clone(),
1107            path: path.to_path_buf(),
1108            enabled: true,
1109        },
1110    );
1111
1112    tracing::debug!(
1113        "load_plugin_internal: plugin '{}' registered, total plugins loaded: {}",
1114        plugin_name,
1115        plugins.len()
1116    );
1117
1118    Ok(())
1119}
1120
1121/// Load all plugins from a directory
1122async fn load_plugins_from_dir_internal(
1123    runtime: Rc<RefCell<QuickJsBackend>>,
1124    plugins: &mut HashMap<String, TsPluginInfo>,
1125    dir: &Path,
1126) -> Vec<String> {
1127    tracing::debug!(
1128        "load_plugins_from_dir_internal: scanning directory {:?}",
1129        dir
1130    );
1131    let mut errors = Vec::new();
1132
1133    if !dir.exists() {
1134        tracing::warn!("Plugin directory does not exist: {:?}", dir);
1135        return errors;
1136    }
1137
1138    // Scan directory for .ts and .js files
1139    match std::fs::read_dir(dir) {
1140        Ok(entries) => {
1141            for entry in entries.flatten() {
1142                let path = entry.path();
1143                let ext = path.extension().and_then(|s| s.to_str());
1144                if ext == Some("ts") || ext == Some("js") {
1145                    tracing::debug!(
1146                        "load_plugins_from_dir_internal: attempting to load {:?}",
1147                        path
1148                    );
1149                    if let Err(e) = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await
1150                    {
1151                        let err = format!("Failed to load {:?}: {}", path, e);
1152                        tracing::error!("{}", err);
1153                        errors.push(err);
1154                    }
1155                }
1156            }
1157
1158            tracing::debug!(
1159                "load_plugins_from_dir_internal: finished loading from {:?}, {} errors",
1160                dir,
1161                errors.len()
1162            );
1163        }
1164        Err(e) => {
1165            let err = format!("Failed to read plugin directory: {}", e);
1166            tracing::error!("{}", err);
1167            errors.push(err);
1168        }
1169    }
1170
1171    errors
1172}
1173
1174/// Load all plugins from a directory with config support
1175/// Returns (errors, discovered_plugins) where discovered_plugins contains all
1176/// found plugin files with their configs (respecting enabled state from provided configs)
1177async fn load_plugins_from_dir_with_config_internal(
1178    runtime: Rc<RefCell<QuickJsBackend>>,
1179    plugins: &mut HashMap<String, TsPluginInfo>,
1180    dir: &Path,
1181    plugin_configs: &HashMap<String, PluginConfig>,
1182) -> (Vec<String>, HashMap<String, PluginConfig>) {
1183    tracing::debug!(
1184        "load_plugins_from_dir_with_config_internal: scanning directory {:?}",
1185        dir
1186    );
1187    let mut errors = Vec::new();
1188    let mut discovered_plugins: HashMap<String, PluginConfig> = HashMap::new();
1189
1190    if !dir.exists() {
1191        tracing::warn!("Plugin directory does not exist: {:?}", dir);
1192        return (errors, discovered_plugins);
1193    }
1194
1195    // First pass: scan directory and collect all plugin files
1196    let mut plugin_files: Vec<(String, std::path::PathBuf)> = Vec::new();
1197    match std::fs::read_dir(dir) {
1198        Ok(entries) => {
1199            for entry in entries.flatten() {
1200                let path = entry.path();
1201                let ext = path.extension().and_then(|s| s.to_str());
1202                if ext == Some("ts") || ext == Some("js") {
1203                    // Skip .i18n.json files (they're not plugins)
1204                    if path.to_string_lossy().contains(".i18n.") {
1205                        continue;
1206                    }
1207                    // Get plugin name from filename (without extension)
1208                    let plugin_name = path
1209                        .file_stem()
1210                        .and_then(|s| s.to_str())
1211                        .unwrap_or("unknown")
1212                        .to_string();
1213                    plugin_files.push((plugin_name, path));
1214                }
1215            }
1216        }
1217        Err(e) => {
1218            let err = format!("Failed to read plugin directory: {}", e);
1219            tracing::error!("{}", err);
1220            errors.push(err);
1221            return (errors, discovered_plugins);
1222        }
1223    }
1224
1225    // Second pass: build discovered_plugins map and load enabled plugins
1226    for (plugin_name, path) in plugin_files {
1227        // Check if we have an existing config for this plugin
1228        let config = if let Some(existing_config) = plugin_configs.get(&plugin_name) {
1229            // Use existing config but ensure path is set
1230            PluginConfig {
1231                enabled: existing_config.enabled,
1232                path: Some(path.clone()),
1233            }
1234        } else {
1235            // Create new config with default enabled = true
1236            PluginConfig::new_with_path(path.clone())
1237        };
1238
1239        // Add to discovered plugins
1240        discovered_plugins.insert(plugin_name.clone(), config.clone());
1241
1242        // Only load if enabled
1243        if config.enabled {
1244            tracing::debug!(
1245                "load_plugins_from_dir_with_config_internal: loading enabled plugin '{}'",
1246                plugin_name
1247            );
1248            if let Err(e) = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await {
1249                let err = format!("Failed to load {:?}: {}", path, e);
1250                tracing::error!("{}", err);
1251                errors.push(err);
1252            }
1253        } else {
1254            tracing::info!(
1255                "load_plugins_from_dir_with_config_internal: skipping disabled plugin '{}'",
1256                plugin_name
1257            );
1258        }
1259    }
1260
1261    tracing::debug!(
1262        "load_plugins_from_dir_with_config_internal: finished. Discovered {} plugins, {} errors",
1263        discovered_plugins.len(),
1264        errors.len()
1265    );
1266
1267    (errors, discovered_plugins)
1268}
1269
1270/// Unload a plugin
1271fn unload_plugin_internal(
1272    runtime: Rc<RefCell<QuickJsBackend>>,
1273    plugins: &mut HashMap<String, TsPluginInfo>,
1274    name: &str,
1275) -> Result<()> {
1276    if plugins.remove(name).is_some() {
1277        tracing::info!("Unloading TypeScript plugin: {}", name);
1278
1279        // Unregister i18n strings
1280        runtime
1281            .borrow_mut()
1282            .services
1283            .unregister_plugin_strings(name);
1284
1285        // Remove all commands registered by this plugin
1286        runtime
1287            .borrow()
1288            .services
1289            .unregister_commands_by_plugin(name);
1290
1291        Ok(())
1292    } else {
1293        Err(anyhow!("Plugin '{}' not found", name))
1294    }
1295}
1296
1297/// Reload a plugin
1298async fn reload_plugin_internal(
1299    runtime: Rc<RefCell<QuickJsBackend>>,
1300    plugins: &mut HashMap<String, TsPluginInfo>,
1301    name: &str,
1302) -> Result<()> {
1303    let path = plugins
1304        .get(name)
1305        .ok_or_else(|| anyhow!("Plugin '{}' not found", name))?
1306        .path
1307        .clone();
1308
1309    unload_plugin_internal(Rc::clone(&runtime), plugins, name)?;
1310    load_plugin_internal(runtime, plugins, &path).await?;
1311
1312    Ok(())
1313}
1314
1315#[cfg(test)]
1316mod tests {
1317    use super::*;
1318    use fresh_core::hooks::hook_args_to_json;
1319
1320    #[test]
1321    fn test_oneshot_channel() {
1322        let (tx, rx) = oneshot::channel::<i32>();
1323        assert!(tx.send(42).is_ok());
1324        assert_eq!(rx.recv().unwrap(), 42);
1325    }
1326
1327    #[test]
1328    fn test_hook_args_to_json_editor_initialized() {
1329        let args = HookArgs::EditorInitialized;
1330        let json = hook_args_to_json(&args).unwrap();
1331        assert_eq!(json, "{}");
1332    }
1333
1334    #[test]
1335    fn test_hook_args_to_json_prompt_changed() {
1336        let args = HookArgs::PromptChanged {
1337            prompt_type: "search".to_string(),
1338            input: "test".to_string(),
1339        };
1340        let json = hook_args_to_json(&args).unwrap();
1341        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
1342        assert_eq!(parsed["prompt_type"], "search");
1343        assert_eq!(parsed["input"], "test");
1344    }
1345}