1use crate::backend::quickjs_backend::{AsyncResourceOwners, PendingResponses, TsPluginInfo};
13use crate::backend::QuickJsBackend;
14use anyhow::{anyhow, Result};
15use fresh_core::api::{EditorStateSnapshot, PluginCommand};
16use fresh_core::hooks::HookArgs;
17use std::cell::RefCell;
18use std::collections::HashMap;
19use std::path::{Path, PathBuf};
20use std::rc::Rc;
21use std::sync::{Arc, RwLock};
22use std::thread::{self, JoinHandle};
23use std::time::Duration;
24
25pub use fresh_core::config::PluginConfig;
27
28#[derive(Debug)]
30pub enum PluginRequest {
31 LoadPlugin {
33 path: PathBuf,
34 response: oneshot::Sender<Result<()>>,
35 },
36
37 ResolveCallback {
39 callback_id: fresh_core::api::JsCallbackId,
40 result_json: String,
41 },
42
43 RejectCallback {
45 callback_id: fresh_core::api::JsCallbackId,
46 error: String,
47 },
48
49 LoadPluginsFromDir {
51 dir: PathBuf,
52 response: oneshot::Sender<Vec<String>>,
53 },
54
55 LoadPluginsFromDirWithConfig {
59 dir: PathBuf,
60 plugin_configs: HashMap<String, PluginConfig>,
61 response: oneshot::Sender<(Vec<String>, HashMap<String, PluginConfig>)>,
62 },
63
64 LoadPluginFromSource {
66 source: String,
67 name: String,
68 is_typescript: bool,
69 response: oneshot::Sender<Result<()>>,
70 },
71
72 UnloadPlugin {
74 name: String,
75 response: oneshot::Sender<Result<()>>,
76 },
77
78 ReloadPlugin {
80 name: String,
81 response: oneshot::Sender<Result<()>>,
82 },
83
84 ExecuteAction {
86 action_name: String,
87 response: oneshot::Sender<Result<()>>,
88 },
89
90 RunHook { hook_name: String, args: HookArgs },
92
93 HasHookHandlers {
95 hook_name: String,
96 response: oneshot::Sender<bool>,
97 },
98
99 ListPlugins {
101 response: oneshot::Sender<Vec<TsPluginInfo>>,
102 },
103
104 TrackAsyncResource {
107 plugin_name: String,
108 resource: TrackedAsyncResource,
109 },
110
111 Shutdown,
113}
114
115#[derive(Debug)]
118pub enum TrackedAsyncResource {
119 VirtualBuffer(fresh_core::BufferId),
120 CompositeBuffer(fresh_core::BufferId),
121 Terminal(fresh_core::TerminalId),
122}
123
124pub mod oneshot {
126 use std::fmt;
127 use std::sync::mpsc;
128
129 pub struct Sender<T>(mpsc::SyncSender<T>);
130 pub struct Receiver<T>(mpsc::Receiver<T>);
131
132 use anyhow::Result;
133
134 impl<T> fmt::Debug for Sender<T> {
135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136 f.debug_tuple("Sender").finish()
137 }
138 }
139
140 impl<T> fmt::Debug for Receiver<T> {
141 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142 f.debug_tuple("Receiver").finish()
143 }
144 }
145
146 impl<T> Sender<T> {
147 pub fn send(self, value: T) -> Result<(), T> {
148 self.0.send(value).map_err(|e| e.0)
149 }
150 }
151
152 impl<T> Receiver<T> {
153 pub fn recv(self) -> Result<T, mpsc::RecvError> {
154 self.0.recv()
155 }
156
157 pub fn recv_timeout(
158 self,
159 timeout: std::time::Duration,
160 ) -> Result<T, mpsc::RecvTimeoutError> {
161 self.0.recv_timeout(timeout)
162 }
163
164 pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
165 self.0.try_recv()
166 }
167 }
168
169 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
170 let (tx, rx) = mpsc::sync_channel(1);
171 (Sender(tx), Receiver(rx))
172 }
173}
174
175pub struct PluginThreadHandle {
177 request_sender: Option<tokio::sync::mpsc::UnboundedSender<PluginRequest>>,
180
181 thread_handle: Option<JoinHandle<()>>,
183
184 state_snapshot: Arc<RwLock<EditorStateSnapshot>>,
186
187 pending_responses: PendingResponses,
189
190 command_receiver: std::sync::mpsc::Receiver<PluginCommand>,
192
193 async_resource_owners: AsyncResourceOwners,
197}
198
199impl PluginThreadHandle {
200 pub fn spawn(services: Arc<dyn fresh_core::services::PluginServiceBridge>) -> Result<Self> {
202 tracing::debug!("PluginThreadHandle::spawn: starting plugin thread creation");
203
204 let (command_sender, command_receiver) = std::sync::mpsc::channel();
206
207 let state_snapshot = Arc::new(RwLock::new(EditorStateSnapshot::new()));
209
210 let pending_responses: PendingResponses =
212 Arc::new(std::sync::Mutex::new(std::collections::HashMap::new()));
213 let thread_pending_responses = Arc::clone(&pending_responses);
214
215 let async_resource_owners: AsyncResourceOwners =
217 Arc::new(std::sync::Mutex::new(std::collections::HashMap::new()));
218 let thread_async_resource_owners = Arc::clone(&async_resource_owners);
219
220 let (request_sender, request_receiver) = tokio::sync::mpsc::unbounded_channel();
222
223 let thread_state_snapshot = Arc::clone(&state_snapshot);
225
226 tracing::debug!("PluginThreadHandle::spawn: spawning OS thread for plugin runtime");
228 let thread_handle = thread::spawn(move || {
229 tracing::debug!("Plugin thread: OS thread started, creating tokio runtime");
230 let rt = match tokio::runtime::Builder::new_current_thread()
232 .enable_all()
233 .build()
234 {
235 Ok(rt) => {
236 tracing::debug!("Plugin thread: tokio runtime created successfully");
237 rt
238 }
239 Err(e) => {
240 tracing::error!("Failed to create plugin thread runtime: {}", e);
241 return;
242 }
243 };
244
245 tracing::debug!("Plugin thread: creating QuickJS runtime");
247 let runtime = match QuickJsBackend::with_state_responses_and_resources(
248 Arc::clone(&thread_state_snapshot),
249 command_sender,
250 thread_pending_responses,
251 services.clone(),
252 thread_async_resource_owners,
253 ) {
254 Ok(rt) => {
255 tracing::debug!("Plugin thread: QuickJS runtime created successfully");
256 rt
257 }
258 Err(e) => {
259 tracing::error!("Failed to create QuickJS runtime: {}", e);
260 return;
261 }
262 };
263
264 let mut plugins: HashMap<String, TsPluginInfo> = HashMap::new();
266
267 tracing::debug!("Plugin thread: starting event loop with LocalSet");
269 let local = tokio::task::LocalSet::new();
270 local.block_on(&rt, async {
271 let runtime = Rc::new(RefCell::new(runtime));
273 tracing::debug!("Plugin thread: entering plugin_thread_loop");
274 plugin_thread_loop(runtime, &mut plugins, request_receiver).await;
275 });
276
277 tracing::info!("Plugin thread shutting down");
278 });
279
280 tracing::debug!("PluginThreadHandle::spawn: OS thread spawned, returning handle");
281 tracing::info!("Plugin thread spawned");
282
283 Ok(Self {
284 request_sender: Some(request_sender),
285 thread_handle: Some(thread_handle),
286 state_snapshot,
287 pending_responses,
288 command_receiver,
289 async_resource_owners,
290 })
291 }
292
293 pub fn is_alive(&self) -> bool {
295 self.thread_handle
296 .as_ref()
297 .map(|h| !h.is_finished())
298 .unwrap_or(false)
299 }
300
301 pub fn check_thread_health(&mut self) {
305 if let Some(handle) = &self.thread_handle {
306 if handle.is_finished() {
307 tracing::error!(
308 "check_thread_health: plugin thread is finished, checking for panic"
309 );
310 if let Some(handle) = self.thread_handle.take() {
312 match handle.join() {
313 Ok(()) => {
314 tracing::warn!("Plugin thread exited normally (unexpected)");
315 }
316 Err(panic_payload) => {
317 std::panic::resume_unwind(panic_payload);
319 }
320 }
321 }
322 }
323 }
324 }
325
326 pub fn deliver_response(&self, response: fresh_core::api::PluginResponse) {
330 if respond_to_pending(&self.pending_responses, response.clone()) {
332 return;
333 }
334
335 use fresh_core::api::{JsCallbackId, PluginResponse};
337
338 match response {
339 PluginResponse::VirtualBufferCreated {
340 request_id,
341 buffer_id,
342 split_id,
343 } => {
344 self.track_async_resource(
346 request_id,
347 TrackedAsyncResource::VirtualBuffer(buffer_id),
348 );
349 let result = serde_json::json!({
351 "bufferId": buffer_id.0,
352 "splitId": split_id.map(|s| s.0)
353 });
354 self.resolve_callback(JsCallbackId(request_id), result.to_string());
355 }
356 PluginResponse::LspRequest { request_id, result } => match result {
357 Ok(value) => {
358 self.resolve_callback(JsCallbackId(request_id), value.to_string());
359 }
360 Err(e) => {
361 self.reject_callback(JsCallbackId(request_id), e);
362 }
363 },
364 PluginResponse::HighlightsComputed { request_id, spans } => {
365 let result = serde_json::to_string(&spans).unwrap_or_else(|_| "[]".to_string());
366 self.resolve_callback(JsCallbackId(request_id), result);
367 }
368 PluginResponse::BufferText { request_id, text } => match text {
369 Ok(content) => {
370 let result =
372 serde_json::to_string(&content).unwrap_or_else(|_| "\"\"".to_string());
373 self.resolve_callback(JsCallbackId(request_id), result);
374 }
375 Err(e) => {
376 self.reject_callback(JsCallbackId(request_id), e);
377 }
378 },
379 PluginResponse::CompositeBufferCreated {
380 request_id,
381 buffer_id,
382 } => {
383 self.track_async_resource(
385 request_id,
386 TrackedAsyncResource::CompositeBuffer(buffer_id),
387 );
388 self.resolve_callback(JsCallbackId(request_id), buffer_id.0.to_string());
390 }
391 PluginResponse::LineStartPosition {
392 request_id,
393 position,
394 } => {
395 let result =
397 serde_json::to_string(&position).unwrap_or_else(|_| "null".to_string());
398 self.resolve_callback(JsCallbackId(request_id), result);
399 }
400 PluginResponse::LineEndPosition {
401 request_id,
402 position,
403 } => {
404 let result =
406 serde_json::to_string(&position).unwrap_or_else(|_| "null".to_string());
407 self.resolve_callback(JsCallbackId(request_id), result);
408 }
409 PluginResponse::BufferLineCount { request_id, count } => {
410 let result = serde_json::to_string(&count).unwrap_or_else(|_| "null".to_string());
412 self.resolve_callback(JsCallbackId(request_id), result);
413 }
414 PluginResponse::TerminalCreated {
415 request_id,
416 buffer_id,
417 terminal_id,
418 split_id,
419 } => {
420 self.track_async_resource(request_id, TrackedAsyncResource::Terminal(terminal_id));
422 let result = serde_json::json!({
423 "bufferId": buffer_id.0,
424 "terminalId": terminal_id.0,
425 "splitId": split_id.map(|s| s.0)
426 });
427 self.resolve_callback(JsCallbackId(request_id), result.to_string());
428 }
429 PluginResponse::SplitByLabel {
430 request_id,
431 split_id,
432 } => {
433 let result = serde_json::to_string(&split_id.map(|s| s.0))
434 .unwrap_or_else(|_| "null".to_string());
435 self.resolve_callback(JsCallbackId(request_id), result);
436 }
437 }
438 }
439
440 fn track_async_resource(&self, request_id: u64, resource: TrackedAsyncResource) {
443 let plugin_name = self
444 .async_resource_owners
445 .lock()
446 .ok()
447 .and_then(|mut owners| owners.remove(&request_id));
448 if let Some(plugin_name) = plugin_name {
449 if let Some(sender) = self.request_sender.as_ref() {
450 let _ = sender.send(PluginRequest::TrackAsyncResource {
451 plugin_name,
452 resource,
453 });
454 }
455 }
456 }
457
458 pub fn load_plugin(&self, path: &Path) -> Result<()> {
460 let (tx, rx) = oneshot::channel();
461 self.request_sender
462 .as_ref()
463 .ok_or_else(|| anyhow!("Plugin thread shut down"))?
464 .send(PluginRequest::LoadPlugin {
465 path: path.to_path_buf(),
466 response: tx,
467 })
468 .map_err(|_| anyhow!("Plugin thread not responding"))?;
469
470 rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
471 }
472
473 pub fn load_plugins_from_dir(&self, dir: &Path) -> Vec<String> {
475 let (tx, rx) = oneshot::channel();
476 let Some(sender) = self.request_sender.as_ref() else {
477 return vec!["Plugin thread shut down".to_string()];
478 };
479 if sender
480 .send(PluginRequest::LoadPluginsFromDir {
481 dir: dir.to_path_buf(),
482 response: tx,
483 })
484 .is_err()
485 {
486 return vec!["Plugin thread not responding".to_string()];
487 }
488
489 rx.recv()
490 .unwrap_or_else(|_| vec!["Plugin thread closed".to_string()])
491 }
492
493 pub fn load_plugins_from_dir_with_config(
497 &self,
498 dir: &Path,
499 plugin_configs: &HashMap<String, PluginConfig>,
500 ) -> (Vec<String>, HashMap<String, PluginConfig>) {
501 let (tx, rx) = oneshot::channel();
502 let Some(sender) = self.request_sender.as_ref() else {
503 return (vec!["Plugin thread shut down".to_string()], HashMap::new());
504 };
505 if sender
506 .send(PluginRequest::LoadPluginsFromDirWithConfig {
507 dir: dir.to_path_buf(),
508 plugin_configs: plugin_configs.clone(),
509 response: tx,
510 })
511 .is_err()
512 {
513 return (
514 vec!["Plugin thread not responding".to_string()],
515 HashMap::new(),
516 );
517 }
518
519 rx.recv()
520 .unwrap_or_else(|_| (vec!["Plugin thread closed".to_string()], HashMap::new()))
521 }
522
523 pub fn load_plugin_from_source(
528 &self,
529 source: &str,
530 name: &str,
531 is_typescript: bool,
532 ) -> Result<()> {
533 let (tx, rx) = oneshot::channel();
534 self.request_sender
535 .as_ref()
536 .ok_or_else(|| anyhow!("Plugin thread shut down"))?
537 .send(PluginRequest::LoadPluginFromSource {
538 source: source.to_string(),
539 name: name.to_string(),
540 is_typescript,
541 response: tx,
542 })
543 .map_err(|_| anyhow!("Plugin thread not responding"))?;
544
545 rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
546 }
547
548 pub fn unload_plugin(&self, name: &str) -> Result<()> {
550 let (tx, rx) = oneshot::channel();
551 self.request_sender
552 .as_ref()
553 .ok_or_else(|| anyhow!("Plugin thread shut down"))?
554 .send(PluginRequest::UnloadPlugin {
555 name: name.to_string(),
556 response: tx,
557 })
558 .map_err(|_| anyhow!("Plugin thread not responding"))?;
559
560 rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
561 }
562
563 pub fn reload_plugin(&self, name: &str) -> Result<()> {
565 let (tx, rx) = oneshot::channel();
566 self.request_sender
567 .as_ref()
568 .ok_or_else(|| anyhow!("Plugin thread shut down"))?
569 .send(PluginRequest::ReloadPlugin {
570 name: name.to_string(),
571 response: tx,
572 })
573 .map_err(|_| anyhow!("Plugin thread not responding"))?;
574
575 rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
576 }
577
578 pub fn execute_action_async(&self, action_name: &str) -> Result<oneshot::Receiver<Result<()>>> {
583 tracing::trace!("execute_action_async: starting action '{}'", action_name);
584 let (tx, rx) = oneshot::channel();
585 self.request_sender
586 .as_ref()
587 .ok_or_else(|| anyhow!("Plugin thread shut down"))?
588 .send(PluginRequest::ExecuteAction {
589 action_name: action_name.to_string(),
590 response: tx,
591 })
592 .map_err(|_| anyhow!("Plugin thread not responding"))?;
593
594 tracing::trace!("execute_action_async: request sent for '{}'", action_name);
595 Ok(rx)
596 }
597
598 pub fn run_hook(&self, hook_name: &str, args: HookArgs) {
604 if let Some(sender) = self.request_sender.as_ref() {
605 let _ = sender.send(PluginRequest::RunHook {
606 hook_name: hook_name.to_string(),
607 args,
608 });
609 }
610 }
611
612 pub fn has_hook_handlers(&self, hook_name: &str) -> bool {
614 let (tx, rx) = oneshot::channel();
615 let Some(sender) = self.request_sender.as_ref() else {
616 return false;
617 };
618 if sender
619 .send(PluginRequest::HasHookHandlers {
620 hook_name: hook_name.to_string(),
621 response: tx,
622 })
623 .is_err()
624 {
625 return false;
626 }
627
628 rx.recv().unwrap_or(false)
629 }
630
631 pub fn list_plugins(&self) -> Vec<TsPluginInfo> {
633 let (tx, rx) = oneshot::channel();
634 let Some(sender) = self.request_sender.as_ref() else {
635 return vec![];
636 };
637 if sender
638 .send(PluginRequest::ListPlugins { response: tx })
639 .is_err()
640 {
641 return vec![];
642 }
643
644 rx.recv().unwrap_or_default()
645 }
646
647 pub fn process_commands(&mut self) -> Vec<PluginCommand> {
652 let mut commands = Vec::new();
653 while let Ok(cmd) = self.command_receiver.try_recv() {
654 commands.push(cmd);
655 }
656 commands
657 }
658
659 pub fn process_commands_until_hook_completed(
669 &mut self,
670 hook_name: &str,
671 timeout: std::time::Duration,
672 ) -> Vec<PluginCommand> {
673 let mut commands = Vec::new();
674 let deadline = std::time::Instant::now() + timeout;
675
676 loop {
677 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
678 if remaining.is_zero() {
679 while let Ok(cmd) = self.command_receiver.try_recv() {
681 if !matches!(&cmd, PluginCommand::HookCompleted { .. }) {
682 commands.push(cmd);
683 }
684 }
685 break;
686 }
687
688 match self.command_receiver.recv_timeout(remaining) {
689 Ok(PluginCommand::HookCompleted {
690 hook_name: ref name,
691 }) if name == hook_name => {
692 while let Ok(cmd) = self.command_receiver.try_recv() {
694 if !matches!(&cmd, PluginCommand::HookCompleted { .. }) {
695 commands.push(cmd);
696 }
697 }
698 break;
699 }
700 Ok(PluginCommand::HookCompleted { .. }) => {
701 continue;
703 }
704 Ok(cmd) => {
705 commands.push(cmd);
706 }
707 Err(_) => {
708 break;
710 }
711 }
712 }
713
714 commands
715 }
716
717 pub fn state_snapshot_handle(&self) -> Arc<RwLock<EditorStateSnapshot>> {
719 Arc::clone(&self.state_snapshot)
720 }
721
722 pub fn shutdown(&mut self) {
724 tracing::debug!("PluginThreadHandle::shutdown: starting shutdown");
725
726 if let Ok(mut pending) = self.pending_responses.lock() {
729 if !pending.is_empty() {
730 tracing::warn!(
731 "PluginThreadHandle::shutdown: dropping {} pending responses: {:?}",
732 pending.len(),
733 pending.keys().collect::<Vec<_>>()
734 );
735 pending.clear(); }
737 }
738
739 if let Some(sender) = self.request_sender.as_ref() {
741 tracing::debug!("PluginThreadHandle::shutdown: sending Shutdown request");
742 let _ = sender.send(PluginRequest::Shutdown);
743 }
744
745 tracing::debug!("PluginThreadHandle::shutdown: dropping request_sender to close channel");
748 self.request_sender.take();
749
750 if let Some(handle) = self.thread_handle.take() {
751 tracing::debug!("PluginThreadHandle::shutdown: joining plugin thread");
752 let _ = handle.join();
753 tracing::debug!("PluginThreadHandle::shutdown: plugin thread joined");
754 }
755
756 tracing::debug!("PluginThreadHandle::shutdown: shutdown complete");
757 }
758
759 pub fn resolve_callback(
762 &self,
763 callback_id: fresh_core::api::JsCallbackId,
764 result_json: String,
765 ) {
766 if let Some(sender) = self.request_sender.as_ref() {
767 let _ = sender.send(PluginRequest::ResolveCallback {
768 callback_id,
769 result_json,
770 });
771 }
772 }
773
774 pub fn reject_callback(&self, callback_id: fresh_core::api::JsCallbackId, error: String) {
777 if let Some(sender) = self.request_sender.as_ref() {
778 let _ = sender.send(PluginRequest::RejectCallback { callback_id, error });
779 }
780 }
781}
782
783impl Drop for PluginThreadHandle {
784 fn drop(&mut self) {
785 self.shutdown();
786 }
787}
788
789fn respond_to_pending(
790 pending_responses: &PendingResponses,
791 response: fresh_core::api::PluginResponse,
792) -> bool {
793 let request_id = match &response {
794 fresh_core::api::PluginResponse::VirtualBufferCreated { request_id, .. } => *request_id,
795 fresh_core::api::PluginResponse::LspRequest { request_id, .. } => *request_id,
796 fresh_core::api::PluginResponse::HighlightsComputed { request_id, .. } => *request_id,
797 fresh_core::api::PluginResponse::BufferText { request_id, .. } => *request_id,
798 fresh_core::api::PluginResponse::CompositeBufferCreated { request_id, .. } => *request_id,
799 fresh_core::api::PluginResponse::LineStartPosition { request_id, .. } => *request_id,
800 fresh_core::api::PluginResponse::LineEndPosition { request_id, .. } => *request_id,
801 fresh_core::api::PluginResponse::BufferLineCount { request_id, .. } => *request_id,
802 fresh_core::api::PluginResponse::TerminalCreated { request_id, .. } => *request_id,
803 fresh_core::api::PluginResponse::SplitByLabel { request_id, .. } => *request_id,
804 };
805
806 let sender = {
807 let mut pending = pending_responses.lock().unwrap();
808 pending.remove(&request_id)
809 };
810
811 if let Some(tx) = sender {
812 let _ = tx.send(response);
813 true
814 } else {
815 false
816 }
817}
818
819#[cfg(test)]
820mod plugin_thread_tests {
821 use super::*;
822 use fresh_core::api::PluginResponse;
823 use serde_json::json;
824 use std::collections::HashMap;
825 use std::sync::{Arc, Mutex};
826 use tokio::sync::oneshot;
827
828 #[test]
829 fn respond_to_pending_sends_lsp_response() {
830 let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
831 let (tx, mut rx) = oneshot::channel();
832 pending.lock().unwrap().insert(123, tx);
833
834 respond_to_pending(
835 &pending,
836 PluginResponse::LspRequest {
837 request_id: 123,
838 result: Ok(json!({ "key": "value" })),
839 },
840 );
841
842 let response = rx.try_recv().expect("expected response");
843 match response {
844 PluginResponse::LspRequest { result, .. } => {
845 assert_eq!(result.unwrap(), json!({ "key": "value" }));
846 }
847 _ => panic!("unexpected variant"),
848 }
849
850 assert!(pending.lock().unwrap().is_empty());
851 }
852
853 #[test]
854 fn respond_to_pending_handles_virtual_buffer_created() {
855 let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
856 let (tx, mut rx) = oneshot::channel();
857 pending.lock().unwrap().insert(456, tx);
858
859 respond_to_pending(
860 &pending,
861 PluginResponse::VirtualBufferCreated {
862 request_id: 456,
863 buffer_id: fresh_core::BufferId(7),
864 split_id: Some(fresh_core::SplitId(1)),
865 },
866 );
867
868 let response = rx.try_recv().expect("expected response");
869 match response {
870 PluginResponse::VirtualBufferCreated { buffer_id, .. } => {
871 assert_eq!(buffer_id.0, 7);
872 }
873 _ => panic!("unexpected variant"),
874 }
875
876 assert!(pending.lock().unwrap().is_empty());
877 }
878}
879
880async fn plugin_thread_loop(
886 runtime: Rc<RefCell<QuickJsBackend>>,
887 plugins: &mut HashMap<String, TsPluginInfo>,
888 mut request_receiver: tokio::sync::mpsc::UnboundedReceiver<PluginRequest>,
889) {
890 tracing::info!("Plugin thread event loop started");
891
892 let poll_interval = Duration::from_millis(1);
894 let mut has_pending_work = false;
895
896 loop {
897 if crate::backend::has_fatal_js_error() {
901 if let Some(error_msg) = crate::backend::take_fatal_js_error() {
902 tracing::error!(
903 "Fatal JS error detected, terminating plugin thread: {}",
904 error_msg
905 );
906 panic!("Fatal plugin error: {}", error_msg);
907 }
908 }
909
910 tokio::select! {
911 biased; request = request_receiver.recv() => {
914 match request {
915 Some(PluginRequest::ExecuteAction {
916 action_name,
917 response,
918 }) => {
919 let result = runtime.borrow_mut().start_action(&action_name);
922 let _ = response.send(result);
923 has_pending_work = true; }
925 Some(request) => {
926 let should_shutdown =
927 handle_request(request, Rc::clone(&runtime), plugins).await;
928
929 if should_shutdown {
930 break;
931 }
932 has_pending_work = true; }
934 None => {
935 tracing::info!("Plugin thread request channel closed");
937 break;
938 }
939 }
940 }
941
942 _ = tokio::time::sleep(poll_interval), if has_pending_work => {
944 has_pending_work = runtime.borrow_mut().poll_event_loop_once();
945 }
946 }
947 }
948}
949
950#[allow(clippy::await_holding_refcell_ref)]
969async fn run_hook_internal_rc(
970 runtime: Rc<RefCell<QuickJsBackend>>,
971 hook_name: &str,
972 args: &HookArgs,
973) -> Result<()> {
974 let json_start = std::time::Instant::now();
977 let json_data = fresh_core::hooks::hook_args_to_json(args)?;
978 tracing::trace!(
979 hook = hook_name,
980 json_us = json_start.elapsed().as_micros(),
981 "hook args serialized"
982 );
983
984 let emit_start = std::time::Instant::now();
986 runtime.borrow_mut().emit(hook_name, &json_data).await?;
987 tracing::trace!(
988 hook = hook_name,
989 emit_ms = emit_start.elapsed().as_millis(),
990 "emit completed"
991 );
992
993 Ok(())
994}
995
996async fn handle_request(
998 request: PluginRequest,
999 runtime: Rc<RefCell<QuickJsBackend>>,
1000 plugins: &mut HashMap<String, TsPluginInfo>,
1001) -> bool {
1002 match request {
1003 PluginRequest::LoadPlugin { path, response } => {
1004 let result = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await;
1005 let _ = response.send(result);
1006 }
1007
1008 PluginRequest::LoadPluginsFromDir { dir, response } => {
1009 let errors = load_plugins_from_dir_internal(Rc::clone(&runtime), plugins, &dir).await;
1010 let _ = response.send(errors);
1011 }
1012
1013 PluginRequest::LoadPluginsFromDirWithConfig {
1014 dir,
1015 plugin_configs,
1016 response,
1017 } => {
1018 let (errors, discovered) = load_plugins_from_dir_with_config_internal(
1019 Rc::clone(&runtime),
1020 plugins,
1021 &dir,
1022 &plugin_configs,
1023 )
1024 .await;
1025 let _ = response.send((errors, discovered));
1026 }
1027
1028 PluginRequest::LoadPluginFromSource {
1029 source,
1030 name,
1031 is_typescript,
1032 response,
1033 } => {
1034 let result = load_plugin_from_source_internal(
1035 Rc::clone(&runtime),
1036 plugins,
1037 &source,
1038 &name,
1039 is_typescript,
1040 );
1041 let _ = response.send(result);
1042 }
1043
1044 PluginRequest::UnloadPlugin { name, response } => {
1045 let result = unload_plugin_internal(Rc::clone(&runtime), plugins, &name);
1046 let _ = response.send(result);
1047 }
1048
1049 PluginRequest::ReloadPlugin { name, response } => {
1050 let result = reload_plugin_internal(Rc::clone(&runtime), plugins, &name).await;
1051 let _ = response.send(result);
1052 }
1053
1054 PluginRequest::ExecuteAction {
1055 action_name,
1056 response,
1057 } => {
1058 tracing::error!(
1061 "ExecuteAction should be handled in main loop, not here: {}",
1062 action_name
1063 );
1064 let _ = response.send(Err(anyhow::anyhow!(
1065 "Internal error: ExecuteAction in wrong handler"
1066 )));
1067 }
1068
1069 PluginRequest::RunHook { hook_name, args } => {
1070 let hook_start = std::time::Instant::now();
1072 if hook_name == "prompt_confirmed" || hook_name == "prompt_cancelled" {
1074 tracing::info!(hook = %hook_name, ?args, "RunHook request received (prompt hook)");
1075 } else {
1076 tracing::trace!(hook = %hook_name, "RunHook request received");
1077 }
1078 if let Err(e) = run_hook_internal_rc(Rc::clone(&runtime), &hook_name, &args).await {
1079 let error_msg = format!("Plugin error in '{}': {}", hook_name, e);
1080 tracing::error!("{}", error_msg);
1081 runtime.borrow_mut().send_status(error_msg);
1083 }
1084 runtime.borrow().send_hook_completed(hook_name.clone());
1087 if hook_name == "prompt_confirmed" || hook_name == "prompt_cancelled" {
1088 tracing::info!(
1089 hook = %hook_name,
1090 elapsed_ms = hook_start.elapsed().as_millis(),
1091 "RunHook completed (prompt hook)"
1092 );
1093 } else {
1094 tracing::trace!(
1095 hook = %hook_name,
1096 elapsed_ms = hook_start.elapsed().as_millis(),
1097 "RunHook completed"
1098 );
1099 }
1100 }
1101
1102 PluginRequest::HasHookHandlers {
1103 hook_name,
1104 response,
1105 } => {
1106 let has_handlers = runtime.borrow().has_handlers(&hook_name);
1107 let _ = response.send(has_handlers);
1108 }
1109
1110 PluginRequest::ListPlugins { response } => {
1111 let plugin_list: Vec<TsPluginInfo> = plugins.values().cloned().collect();
1112 let _ = response.send(plugin_list);
1113 }
1114
1115 PluginRequest::ResolveCallback {
1116 callback_id,
1117 result_json,
1118 } => {
1119 tracing::info!(
1120 "ResolveCallback: resolving callback_id={} with result_json={}",
1121 callback_id,
1122 result_json
1123 );
1124 runtime
1125 .borrow_mut()
1126 .resolve_callback(callback_id, &result_json);
1127 tracing::info!(
1129 "ResolveCallback: done resolving callback_id={}",
1130 callback_id
1131 );
1132 }
1133
1134 PluginRequest::RejectCallback { callback_id, error } => {
1135 runtime.borrow_mut().reject_callback(callback_id, &error);
1136 }
1138
1139 PluginRequest::TrackAsyncResource {
1140 plugin_name,
1141 resource,
1142 } => {
1143 let rt = runtime.borrow();
1144 let mut tracked = rt.plugin_tracked_state.borrow_mut();
1145 let state = tracked.entry(plugin_name).or_default();
1146 match resource {
1147 TrackedAsyncResource::VirtualBuffer(buffer_id) => {
1148 state.virtual_buffer_ids.push(buffer_id);
1149 }
1150 TrackedAsyncResource::CompositeBuffer(buffer_id) => {
1151 state.composite_buffer_ids.push(buffer_id);
1152 }
1153 TrackedAsyncResource::Terminal(terminal_id) => {
1154 state.terminal_ids.push(terminal_id);
1155 }
1156 }
1157 }
1158
1159 PluginRequest::Shutdown => {
1160 tracing::info!("Plugin thread received shutdown request");
1161 return true;
1162 }
1163 }
1164
1165 false
1166}
1167
1168#[allow(clippy::await_holding_refcell_ref)]
1176async fn load_plugin_internal(
1177 runtime: Rc<RefCell<QuickJsBackend>>,
1178 plugins: &mut HashMap<String, TsPluginInfo>,
1179 path: &Path,
1180) -> Result<()> {
1181 let plugin_name = path
1182 .file_stem()
1183 .and_then(|s| s.to_str())
1184 .ok_or_else(|| anyhow!("Invalid plugin filename"))?
1185 .to_string();
1186
1187 tracing::info!("Loading TypeScript plugin: {} from {:?}", plugin_name, path);
1188 tracing::debug!(
1189 "load_plugin_internal: starting module load for plugin '{}'",
1190 plugin_name
1191 );
1192
1193 let path_str = path
1195 .to_str()
1196 .ok_or_else(|| anyhow!("Invalid path encoding"))?;
1197
1198 let i18n_path = path.with_extension("i18n.json");
1200 if i18n_path.exists() {
1201 if let Ok(content) = std::fs::read_to_string(&i18n_path) {
1202 if let Ok(strings) = serde_json::from_str::<
1203 std::collections::HashMap<String, std::collections::HashMap<String, String>>,
1204 >(&content)
1205 {
1206 runtime
1207 .borrow_mut()
1208 .services
1209 .register_plugin_strings(&plugin_name, strings);
1210 tracing::debug!("Loaded i18n strings for plugin '{}'", plugin_name);
1211 }
1212 }
1213 }
1214
1215 let load_start = std::time::Instant::now();
1216 runtime
1217 .borrow_mut()
1218 .load_module_with_source(path_str, &plugin_name)
1219 .await?;
1220 let load_elapsed = load_start.elapsed();
1221
1222 tracing::debug!(
1223 "load_plugin_internal: plugin '{}' loaded successfully in {:?}",
1224 plugin_name,
1225 load_elapsed
1226 );
1227
1228 plugins.insert(
1230 plugin_name.clone(),
1231 TsPluginInfo {
1232 name: plugin_name.clone(),
1233 path: path.to_path_buf(),
1234 enabled: true,
1235 },
1236 );
1237
1238 tracing::debug!(
1239 "load_plugin_internal: plugin '{}' registered, total plugins loaded: {}",
1240 plugin_name,
1241 plugins.len()
1242 );
1243
1244 Ok(())
1245}
1246
1247async fn load_plugins_from_dir_internal(
1249 runtime: Rc<RefCell<QuickJsBackend>>,
1250 plugins: &mut HashMap<String, TsPluginInfo>,
1251 dir: &Path,
1252) -> Vec<String> {
1253 tracing::debug!(
1254 "load_plugins_from_dir_internal: scanning directory {:?}",
1255 dir
1256 );
1257 let mut errors = Vec::new();
1258
1259 if !dir.exists() {
1260 tracing::warn!("Plugin directory does not exist: {:?}", dir);
1261 return errors;
1262 }
1263
1264 match std::fs::read_dir(dir) {
1266 Ok(entries) => {
1267 for entry in entries.flatten() {
1268 let path = entry.path();
1269 let ext = path.extension().and_then(|s| s.to_str());
1270 if ext == Some("ts") || ext == Some("js") {
1271 tracing::debug!(
1272 "load_plugins_from_dir_internal: attempting to load {:?}",
1273 path
1274 );
1275 if let Err(e) = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await
1276 {
1277 let err = format!("Failed to load {:?}: {}", path, e);
1278 tracing::error!("{}", err);
1279 errors.push(err);
1280 }
1281 }
1282 }
1283
1284 tracing::debug!(
1285 "load_plugins_from_dir_internal: finished loading from {:?}, {} errors",
1286 dir,
1287 errors.len()
1288 );
1289 }
1290 Err(e) => {
1291 let err = format!("Failed to read plugin directory: {}", e);
1292 tracing::error!("{}", err);
1293 errors.push(err);
1294 }
1295 }
1296
1297 errors
1298}
1299
1300async fn load_plugins_from_dir_with_config_internal(
1304 runtime: Rc<RefCell<QuickJsBackend>>,
1305 plugins: &mut HashMap<String, TsPluginInfo>,
1306 dir: &Path,
1307 plugin_configs: &HashMap<String, PluginConfig>,
1308) -> (Vec<String>, HashMap<String, PluginConfig>) {
1309 tracing::debug!(
1310 "load_plugins_from_dir_with_config_internal: scanning directory {:?}",
1311 dir
1312 );
1313 let mut errors = Vec::new();
1314 let mut discovered_plugins: HashMap<String, PluginConfig> = HashMap::new();
1315
1316 if !dir.exists() {
1317 tracing::warn!("Plugin directory does not exist: {:?}", dir);
1318 return (errors, discovered_plugins);
1319 }
1320
1321 let mut plugin_files: Vec<(String, std::path::PathBuf)> = Vec::new();
1323 match std::fs::read_dir(dir) {
1324 Ok(entries) => {
1325 for entry in entries.flatten() {
1326 let path = entry.path();
1327 let ext = path.extension().and_then(|s| s.to_str());
1328 if ext == Some("ts") || ext == Some("js") {
1329 if path.to_string_lossy().contains(".i18n.") {
1331 continue;
1332 }
1333 let plugin_name = path
1335 .file_stem()
1336 .and_then(|s| s.to_str())
1337 .unwrap_or("unknown")
1338 .to_string();
1339 plugin_files.push((plugin_name, path));
1340 }
1341 }
1342 }
1343 Err(e) => {
1344 let err = format!("Failed to read plugin directory: {}", e);
1345 tracing::error!("{}", err);
1346 errors.push(err);
1347 return (errors, discovered_plugins);
1348 }
1349 }
1350
1351 for (plugin_name, path) in plugin_files {
1353 let config = if let Some(existing_config) = plugin_configs.get(&plugin_name) {
1355 PluginConfig {
1357 enabled: existing_config.enabled,
1358 path: Some(path.clone()),
1359 }
1360 } else {
1361 PluginConfig::new_with_path(path.clone())
1363 };
1364
1365 discovered_plugins.insert(plugin_name.clone(), config.clone());
1367
1368 if config.enabled {
1370 tracing::debug!(
1371 "load_plugins_from_dir_with_config_internal: loading enabled plugin '{}'",
1372 plugin_name
1373 );
1374 if let Err(e) = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await {
1375 let err = format!("Failed to load {:?}: {}", path, e);
1376 tracing::error!("{}", err);
1377 errors.push(err);
1378 }
1379 } else {
1380 tracing::info!(
1381 "load_plugins_from_dir_with_config_internal: skipping disabled plugin '{}'",
1382 plugin_name
1383 );
1384 }
1385 }
1386
1387 tracing::debug!(
1388 "load_plugins_from_dir_with_config_internal: finished. Discovered {} plugins, {} errors",
1389 discovered_plugins.len(),
1390 errors.len()
1391 );
1392
1393 (errors, discovered_plugins)
1394}
1395
1396fn load_plugin_from_source_internal(
1401 runtime: Rc<RefCell<QuickJsBackend>>,
1402 plugins: &mut HashMap<String, TsPluginInfo>,
1403 source: &str,
1404 name: &str,
1405 is_typescript: bool,
1406) -> Result<()> {
1407 if plugins.contains_key(name) {
1409 tracing::info!(
1410 "Hot-reloading buffer plugin '{}' — unloading previous version",
1411 name
1412 );
1413 unload_plugin_internal(Rc::clone(&runtime), plugins, name)?;
1414 }
1415
1416 tracing::info!("Loading plugin from source: {}", name);
1417
1418 runtime
1419 .borrow_mut()
1420 .execute_source(source, name, is_typescript)?;
1421
1422 plugins.insert(
1424 name.to_string(),
1425 TsPluginInfo {
1426 name: name.to_string(),
1427 path: PathBuf::from(format!("<buffer:{}>", name)),
1428 enabled: true,
1429 },
1430 );
1431
1432 tracing::info!(
1433 "Buffer plugin '{}' loaded successfully, total plugins: {}",
1434 name,
1435 plugins.len()
1436 );
1437
1438 Ok(())
1439}
1440
1441fn unload_plugin_internal(
1443 runtime: Rc<RefCell<QuickJsBackend>>,
1444 plugins: &mut HashMap<String, TsPluginInfo>,
1445 name: &str,
1446) -> Result<()> {
1447 if plugins.remove(name).is_some() {
1448 tracing::info!("Unloading TypeScript plugin: {}", name);
1449
1450 runtime
1452 .borrow_mut()
1453 .services
1454 .unregister_plugin_strings(name);
1455
1456 runtime
1458 .borrow()
1459 .services
1460 .unregister_commands_by_plugin(name);
1461
1462 runtime.borrow().cleanup_plugin(name);
1464
1465 Ok(())
1466 } else {
1467 Err(anyhow!("Plugin '{}' not found", name))
1468 }
1469}
1470
1471async fn reload_plugin_internal(
1473 runtime: Rc<RefCell<QuickJsBackend>>,
1474 plugins: &mut HashMap<String, TsPluginInfo>,
1475 name: &str,
1476) -> Result<()> {
1477 let path = plugins
1478 .get(name)
1479 .ok_or_else(|| anyhow!("Plugin '{}' not found", name))?
1480 .path
1481 .clone();
1482
1483 unload_plugin_internal(Rc::clone(&runtime), plugins, name)?;
1484 load_plugin_internal(runtime, plugins, &path).await?;
1485
1486 Ok(())
1487}
1488
1489#[cfg(test)]
1490mod tests {
1491 use super::*;
1492 use fresh_core::hooks::hook_args_to_json;
1493
1494 #[test]
1495 fn test_oneshot_channel() {
1496 let (tx, rx) = oneshot::channel::<i32>();
1497 assert!(tx.send(42).is_ok());
1498 assert_eq!(rx.recv().unwrap(), 42);
1499 }
1500
1501 #[test]
1502 fn test_hook_args_to_json_editor_initialized() {
1503 let args = HookArgs::EditorInitialized;
1504 let json = hook_args_to_json(&args).unwrap();
1505 assert_eq!(json, serde_json::json!({}));
1506 }
1507
1508 #[test]
1509 fn test_hook_args_to_json_prompt_changed() {
1510 let args = HookArgs::PromptChanged {
1511 prompt_type: "search".to_string(),
1512 input: "test".to_string(),
1513 };
1514 let json = hook_args_to_json(&args).unwrap();
1515 assert_eq!(json["prompt_type"], "search");
1516 assert_eq!(json["input"], "test");
1517 }
1518}