1use crate::backend::quickjs_backend::{AsyncResourceOwners, PendingResponses, TsPluginInfo};
13use crate::backend::QuickJsBackend;
14use anyhow::{anyhow, Result};
15use fresh_core::api::{EditorStateSnapshot, PluginCommand};
16use fresh_core::hooks::HookArgs;
17use std::cell::RefCell;
18use std::collections::HashMap;
19use std::path::{Path, PathBuf};
20use std::rc::Rc;
21use std::sync::{Arc, RwLock};
22use std::thread::{self, JoinHandle};
23use std::time::Duration;
24
25pub use fresh_core::config::PluginConfig;
27
28#[derive(Debug)]
30pub enum PluginRequest {
31 LoadPlugin {
33 path: PathBuf,
34 response: oneshot::Sender<Result<()>>,
35 },
36
37 ResolveCallback {
39 callback_id: fresh_core::api::JsCallbackId,
40 result_json: String,
41 },
42
43 RejectCallback {
45 callback_id: fresh_core::api::JsCallbackId,
46 error: String,
47 },
48
49 CallStreamingCallback {
51 callback_id: fresh_core::api::JsCallbackId,
52 result_json: String,
53 done: bool,
54 },
55
56 LoadPluginsFromDir {
58 dir: PathBuf,
59 response: oneshot::Sender<Vec<String>>,
60 },
61
62 LoadPluginsFromDirWithConfig {
66 dir: PathBuf,
67 plugin_configs: HashMap<String, PluginConfig>,
68 response: oneshot::Sender<(Vec<String>, HashMap<String, PluginConfig>)>,
69 },
70
71 LoadPluginFromSource {
73 source: String,
74 name: String,
75 is_typescript: bool,
76 response: oneshot::Sender<Result<()>>,
77 },
78
79 UnloadPlugin {
81 name: String,
82 response: oneshot::Sender<Result<()>>,
83 },
84
85 ReloadPlugin {
87 name: String,
88 response: oneshot::Sender<Result<()>>,
89 },
90
91 ExecuteAction {
93 action_name: String,
94 response: oneshot::Sender<Result<()>>,
95 },
96
97 RunHook { hook_name: String, args: HookArgs },
99
100 HasHookHandlers {
102 hook_name: String,
103 response: oneshot::Sender<bool>,
104 },
105
106 ListPlugins {
108 response: oneshot::Sender<Vec<TsPluginInfo>>,
109 },
110
111 TrackAsyncResource {
114 plugin_name: String,
115 resource: TrackedAsyncResource,
116 },
117
118 Shutdown,
120}
121
122#[derive(Debug)]
125pub enum TrackedAsyncResource {
126 VirtualBuffer(fresh_core::BufferId),
127 CompositeBuffer(fresh_core::BufferId),
128 Terminal(fresh_core::TerminalId),
129}
130
131pub mod oneshot {
133 use std::fmt;
134 use std::sync::mpsc;
135
136 pub struct Sender<T>(mpsc::SyncSender<T>);
137 pub struct Receiver<T>(mpsc::Receiver<T>);
138
139 use anyhow::Result;
140
141 impl<T> fmt::Debug for Sender<T> {
142 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143 f.debug_tuple("Sender").finish()
144 }
145 }
146
147 impl<T> fmt::Debug for Receiver<T> {
148 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149 f.debug_tuple("Receiver").finish()
150 }
151 }
152
153 impl<T> Sender<T> {
154 pub fn send(self, value: T) -> Result<(), T> {
155 self.0.send(value).map_err(|e| e.0)
156 }
157 }
158
159 impl<T> Receiver<T> {
160 pub fn recv(self) -> Result<T, mpsc::RecvError> {
161 self.0.recv()
162 }
163
164 pub fn recv_timeout(
165 self,
166 timeout: std::time::Duration,
167 ) -> Result<T, mpsc::RecvTimeoutError> {
168 self.0.recv_timeout(timeout)
169 }
170
171 pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
172 self.0.try_recv()
173 }
174 }
175
176 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
177 let (tx, rx) = mpsc::sync_channel(1);
178 (Sender(tx), Receiver(rx))
179 }
180}
181
182pub struct PluginThreadHandle {
184 request_sender: Option<tokio::sync::mpsc::UnboundedSender<PluginRequest>>,
187
188 thread_handle: Option<JoinHandle<()>>,
190
191 state_snapshot: Arc<RwLock<EditorStateSnapshot>>,
193
194 pending_responses: PendingResponses,
196
197 command_receiver: std::sync::mpsc::Receiver<PluginCommand>,
199
200 async_resource_owners: AsyncResourceOwners,
204}
205
206impl PluginThreadHandle {
207 pub fn spawn(services: Arc<dyn fresh_core::services::PluginServiceBridge>) -> Result<Self> {
209 tracing::debug!("PluginThreadHandle::spawn: starting plugin thread creation");
210
211 let (command_sender, command_receiver) = std::sync::mpsc::channel();
213
214 let state_snapshot = Arc::new(RwLock::new(EditorStateSnapshot::new()));
216
217 let pending_responses: PendingResponses =
219 Arc::new(std::sync::Mutex::new(std::collections::HashMap::new()));
220 let thread_pending_responses = Arc::clone(&pending_responses);
221
222 let async_resource_owners: AsyncResourceOwners =
224 Arc::new(std::sync::Mutex::new(std::collections::HashMap::new()));
225 let thread_async_resource_owners = Arc::clone(&async_resource_owners);
226
227 let (request_sender, request_receiver) = tokio::sync::mpsc::unbounded_channel();
229
230 let thread_state_snapshot = Arc::clone(&state_snapshot);
232
233 tracing::debug!("PluginThreadHandle::spawn: spawning OS thread for plugin runtime");
235 let thread_handle = thread::spawn(move || {
236 tracing::debug!("Plugin thread: OS thread started, creating tokio runtime");
237 let rt = match tokio::runtime::Builder::new_current_thread()
239 .enable_all()
240 .build()
241 {
242 Ok(rt) => {
243 tracing::debug!("Plugin thread: tokio runtime created successfully");
244 rt
245 }
246 Err(e) => {
247 tracing::error!("Failed to create plugin thread runtime: {}", e);
248 return;
249 }
250 };
251
252 tracing::debug!("Plugin thread: creating QuickJS runtime");
254 let runtime = match QuickJsBackend::with_state_responses_and_resources(
255 Arc::clone(&thread_state_snapshot),
256 command_sender,
257 thread_pending_responses,
258 services.clone(),
259 thread_async_resource_owners,
260 ) {
261 Ok(rt) => {
262 tracing::debug!("Plugin thread: QuickJS runtime created successfully");
263 rt
264 }
265 Err(e) => {
266 tracing::error!("Failed to create QuickJS runtime: {}", e);
267 return;
268 }
269 };
270
271 let mut plugins: HashMap<String, TsPluginInfo> = HashMap::new();
273
274 tracing::debug!("Plugin thread: starting event loop with LocalSet");
276 let local = tokio::task::LocalSet::new();
277 local.block_on(&rt, async {
278 let runtime = Rc::new(RefCell::new(runtime));
280 tracing::debug!("Plugin thread: entering plugin_thread_loop");
281 plugin_thread_loop(runtime, &mut plugins, request_receiver).await;
282 });
283
284 tracing::info!("Plugin thread shutting down");
285 });
286
287 tracing::debug!("PluginThreadHandle::spawn: OS thread spawned, returning handle");
288 tracing::info!("Plugin thread spawned");
289
290 Ok(Self {
291 request_sender: Some(request_sender),
292 thread_handle: Some(thread_handle),
293 state_snapshot,
294 pending_responses,
295 command_receiver,
296 async_resource_owners,
297 })
298 }
299
300 pub fn is_alive(&self) -> bool {
302 self.thread_handle
303 .as_ref()
304 .map(|h| !h.is_finished())
305 .unwrap_or(false)
306 }
307
308 pub fn check_thread_health(&mut self) {
312 if let Some(handle) = &self.thread_handle {
313 if handle.is_finished() {
314 tracing::error!(
315 "check_thread_health: plugin thread is finished, checking for panic"
316 );
317 if let Some(handle) = self.thread_handle.take() {
319 match handle.join() {
320 Ok(()) => {
321 tracing::warn!("Plugin thread exited normally (unexpected)");
322 }
323 Err(panic_payload) => {
324 std::panic::resume_unwind(panic_payload);
326 }
327 }
328 }
329 }
330 }
331 }
332
333 pub fn deliver_response(&self, response: fresh_core::api::PluginResponse) {
337 if respond_to_pending(&self.pending_responses, response.clone()) {
339 return;
340 }
341
342 use fresh_core::api::{JsCallbackId, PluginResponse};
344
345 match response {
346 PluginResponse::VirtualBufferCreated {
347 request_id,
348 buffer_id,
349 split_id,
350 } => {
351 self.track_async_resource(
353 request_id,
354 TrackedAsyncResource::VirtualBuffer(buffer_id),
355 );
356 let result = serde_json::json!({
358 "bufferId": buffer_id.0,
359 "splitId": split_id.map(|s| s.0)
360 });
361 self.resolve_callback(JsCallbackId(request_id), result.to_string());
362 }
363 PluginResponse::LspRequest { request_id, result } => match result {
364 Ok(value) => {
365 self.resolve_callback(JsCallbackId(request_id), value.to_string());
366 }
367 Err(e) => {
368 self.reject_callback(JsCallbackId(request_id), e);
369 }
370 },
371 PluginResponse::HighlightsComputed { request_id, spans } => {
372 let result = serde_json::to_string(&spans).unwrap_or_else(|_| "[]".to_string());
373 self.resolve_callback(JsCallbackId(request_id), result);
374 }
375 PluginResponse::BufferText { request_id, text } => match text {
376 Ok(content) => {
377 let result =
379 serde_json::to_string(&content).unwrap_or_else(|_| "\"\"".to_string());
380 self.resolve_callback(JsCallbackId(request_id), result);
381 }
382 Err(e) => {
383 self.reject_callback(JsCallbackId(request_id), e);
384 }
385 },
386 PluginResponse::CompositeBufferCreated {
387 request_id,
388 buffer_id,
389 } => {
390 self.track_async_resource(
392 request_id,
393 TrackedAsyncResource::CompositeBuffer(buffer_id),
394 );
395 self.resolve_callback(JsCallbackId(request_id), buffer_id.0.to_string());
397 }
398 PluginResponse::LineStartPosition {
399 request_id,
400 position,
401 } => {
402 let result =
404 serde_json::to_string(&position).unwrap_or_else(|_| "null".to_string());
405 self.resolve_callback(JsCallbackId(request_id), result);
406 }
407 PluginResponse::LineEndPosition {
408 request_id,
409 position,
410 } => {
411 let result =
413 serde_json::to_string(&position).unwrap_or_else(|_| "null".to_string());
414 self.resolve_callback(JsCallbackId(request_id), result);
415 }
416 PluginResponse::BufferLineCount { request_id, count } => {
417 let result = serde_json::to_string(&count).unwrap_or_else(|_| "null".to_string());
419 self.resolve_callback(JsCallbackId(request_id), result);
420 }
421 PluginResponse::TerminalCreated {
422 request_id,
423 buffer_id,
424 terminal_id,
425 split_id,
426 } => {
427 self.track_async_resource(request_id, TrackedAsyncResource::Terminal(terminal_id));
429 let result = serde_json::json!({
430 "bufferId": buffer_id.0,
431 "terminalId": terminal_id.0,
432 "splitId": split_id.map(|s| s.0)
433 });
434 self.resolve_callback(JsCallbackId(request_id), result.to_string());
435 }
436 PluginResponse::SplitByLabel {
437 request_id,
438 split_id,
439 } => {
440 let result = serde_json::to_string(&split_id.map(|s| s.0))
441 .unwrap_or_else(|_| "null".to_string());
442 self.resolve_callback(JsCallbackId(request_id), result);
443 }
444 }
445 }
446
447 fn track_async_resource(&self, request_id: u64, resource: TrackedAsyncResource) {
450 let plugin_name = self
451 .async_resource_owners
452 .lock()
453 .ok()
454 .and_then(|mut owners| owners.remove(&request_id));
455 if let Some(plugin_name) = plugin_name {
456 if let Some(sender) = self.request_sender.as_ref() {
457 let _ = sender.send(PluginRequest::TrackAsyncResource {
458 plugin_name,
459 resource,
460 });
461 }
462 }
463 }
464
465 pub fn load_plugin(&self, path: &Path) -> Result<()> {
467 let (tx, rx) = oneshot::channel();
468 self.request_sender
469 .as_ref()
470 .ok_or_else(|| anyhow!("Plugin thread shut down"))?
471 .send(PluginRequest::LoadPlugin {
472 path: path.to_path_buf(),
473 response: tx,
474 })
475 .map_err(|_| anyhow!("Plugin thread not responding"))?;
476
477 rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
478 }
479
480 pub fn load_plugins_from_dir(&self, dir: &Path) -> Vec<String> {
482 let (tx, rx) = oneshot::channel();
483 let Some(sender) = self.request_sender.as_ref() else {
484 return vec!["Plugin thread shut down".to_string()];
485 };
486 if sender
487 .send(PluginRequest::LoadPluginsFromDir {
488 dir: dir.to_path_buf(),
489 response: tx,
490 })
491 .is_err()
492 {
493 return vec!["Plugin thread not responding".to_string()];
494 }
495
496 rx.recv()
497 .unwrap_or_else(|_| vec!["Plugin thread closed".to_string()])
498 }
499
500 pub fn load_plugins_from_dir_with_config(
504 &self,
505 dir: &Path,
506 plugin_configs: &HashMap<String, PluginConfig>,
507 ) -> (Vec<String>, HashMap<String, PluginConfig>) {
508 let (tx, rx) = oneshot::channel();
509 let Some(sender) = self.request_sender.as_ref() else {
510 return (vec!["Plugin thread shut down".to_string()], HashMap::new());
511 };
512 if sender
513 .send(PluginRequest::LoadPluginsFromDirWithConfig {
514 dir: dir.to_path_buf(),
515 plugin_configs: plugin_configs.clone(),
516 response: tx,
517 })
518 .is_err()
519 {
520 return (
521 vec!["Plugin thread not responding".to_string()],
522 HashMap::new(),
523 );
524 }
525
526 rx.recv()
527 .unwrap_or_else(|_| (vec!["Plugin thread closed".to_string()], HashMap::new()))
528 }
529
530 pub fn load_plugin_from_source(
535 &self,
536 source: &str,
537 name: &str,
538 is_typescript: bool,
539 ) -> Result<()> {
540 let (tx, rx) = oneshot::channel();
541 self.request_sender
542 .as_ref()
543 .ok_or_else(|| anyhow!("Plugin thread shut down"))?
544 .send(PluginRequest::LoadPluginFromSource {
545 source: source.to_string(),
546 name: name.to_string(),
547 is_typescript,
548 response: tx,
549 })
550 .map_err(|_| anyhow!("Plugin thread not responding"))?;
551
552 rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
553 }
554
555 pub fn unload_plugin(&self, name: &str) -> Result<()> {
557 let (tx, rx) = oneshot::channel();
558 self.request_sender
559 .as_ref()
560 .ok_or_else(|| anyhow!("Plugin thread shut down"))?
561 .send(PluginRequest::UnloadPlugin {
562 name: name.to_string(),
563 response: tx,
564 })
565 .map_err(|_| anyhow!("Plugin thread not responding"))?;
566
567 rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
568 }
569
570 pub fn reload_plugin(&self, name: &str) -> Result<()> {
572 let (tx, rx) = oneshot::channel();
573 self.request_sender
574 .as_ref()
575 .ok_or_else(|| anyhow!("Plugin thread shut down"))?
576 .send(PluginRequest::ReloadPlugin {
577 name: name.to_string(),
578 response: tx,
579 })
580 .map_err(|_| anyhow!("Plugin thread not responding"))?;
581
582 rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
583 }
584
585 pub fn execute_action_async(&self, action_name: &str) -> Result<oneshot::Receiver<Result<()>>> {
590 tracing::trace!("execute_action_async: starting action '{}'", action_name);
591 let (tx, rx) = oneshot::channel();
592 self.request_sender
593 .as_ref()
594 .ok_or_else(|| anyhow!("Plugin thread shut down"))?
595 .send(PluginRequest::ExecuteAction {
596 action_name: action_name.to_string(),
597 response: tx,
598 })
599 .map_err(|_| anyhow!("Plugin thread not responding"))?;
600
601 tracing::trace!("execute_action_async: request sent for '{}'", action_name);
602 Ok(rx)
603 }
604
605 pub fn run_hook(&self, hook_name: &str, args: HookArgs) {
611 if let Some(sender) = self.request_sender.as_ref() {
612 let _ = sender.send(PluginRequest::RunHook {
613 hook_name: hook_name.to_string(),
614 args,
615 });
616 }
617 }
618
619 pub fn has_hook_handlers(&self, hook_name: &str) -> bool {
621 let (tx, rx) = oneshot::channel();
622 let Some(sender) = self.request_sender.as_ref() else {
623 return false;
624 };
625 if sender
626 .send(PluginRequest::HasHookHandlers {
627 hook_name: hook_name.to_string(),
628 response: tx,
629 })
630 .is_err()
631 {
632 return false;
633 }
634
635 rx.recv().unwrap_or(false)
636 }
637
638 pub fn list_plugins(&self) -> Vec<TsPluginInfo> {
640 let (tx, rx) = oneshot::channel();
641 let Some(sender) = self.request_sender.as_ref() else {
642 return vec![];
643 };
644 if sender
645 .send(PluginRequest::ListPlugins { response: tx })
646 .is_err()
647 {
648 return vec![];
649 }
650
651 rx.recv().unwrap_or_default()
652 }
653
654 pub fn process_commands(&mut self) -> Vec<PluginCommand> {
659 let mut commands = Vec::new();
660 while let Ok(cmd) = self.command_receiver.try_recv() {
661 commands.push(cmd);
662 }
663 commands
664 }
665
666 pub fn process_commands_until_hook_completed(
676 &mut self,
677 hook_name: &str,
678 timeout: std::time::Duration,
679 ) -> Vec<PluginCommand> {
680 let mut commands = Vec::new();
681 let deadline = std::time::Instant::now() + timeout;
682
683 loop {
684 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
685 if remaining.is_zero() {
686 while let Ok(cmd) = self.command_receiver.try_recv() {
688 if !matches!(&cmd, PluginCommand::HookCompleted { .. }) {
689 commands.push(cmd);
690 }
691 }
692 break;
693 }
694
695 match self.command_receiver.recv_timeout(remaining) {
696 Ok(PluginCommand::HookCompleted {
697 hook_name: ref name,
698 }) if name == hook_name => {
699 while let Ok(cmd) = self.command_receiver.try_recv() {
701 if !matches!(&cmd, PluginCommand::HookCompleted { .. }) {
702 commands.push(cmd);
703 }
704 }
705 break;
706 }
707 Ok(PluginCommand::HookCompleted { .. }) => {
708 continue;
710 }
711 Ok(cmd) => {
712 commands.push(cmd);
713 }
714 Err(_) => {
715 break;
717 }
718 }
719 }
720
721 commands
722 }
723
724 pub fn state_snapshot_handle(&self) -> Arc<RwLock<EditorStateSnapshot>> {
726 Arc::clone(&self.state_snapshot)
727 }
728
729 pub fn shutdown(&mut self) {
731 tracing::debug!("PluginThreadHandle::shutdown: starting shutdown");
732
733 if let Ok(mut pending) = self.pending_responses.lock() {
736 if !pending.is_empty() {
737 tracing::warn!(
738 "PluginThreadHandle::shutdown: dropping {} pending responses: {:?}",
739 pending.len(),
740 pending.keys().collect::<Vec<_>>()
741 );
742 pending.clear(); }
744 }
745
746 if let Some(sender) = self.request_sender.as_ref() {
748 tracing::debug!("PluginThreadHandle::shutdown: sending Shutdown request");
749 let _ = sender.send(PluginRequest::Shutdown);
750 }
751
752 tracing::debug!("PluginThreadHandle::shutdown: dropping request_sender to close channel");
755 self.request_sender.take();
756
757 if let Some(handle) = self.thread_handle.take() {
758 tracing::debug!("PluginThreadHandle::shutdown: joining plugin thread");
759 let _ = handle.join();
760 tracing::debug!("PluginThreadHandle::shutdown: plugin thread joined");
761 }
762
763 tracing::debug!("PluginThreadHandle::shutdown: shutdown complete");
764 }
765
766 pub fn resolve_callback(
769 &self,
770 callback_id: fresh_core::api::JsCallbackId,
771 result_json: String,
772 ) {
773 if let Some(sender) = self.request_sender.as_ref() {
774 let _ = sender.send(PluginRequest::ResolveCallback {
775 callback_id,
776 result_json,
777 });
778 }
779 }
780
781 pub fn reject_callback(&self, callback_id: fresh_core::api::JsCallbackId, error: String) {
784 if let Some(sender) = self.request_sender.as_ref() {
785 let _ = sender.send(PluginRequest::RejectCallback { callback_id, error });
786 }
787 }
788
789 pub fn call_streaming_callback(
792 &self,
793 callback_id: fresh_core::api::JsCallbackId,
794 result_json: String,
795 done: bool,
796 ) {
797 if let Some(sender) = self.request_sender.as_ref() {
798 let _ = sender.send(PluginRequest::CallStreamingCallback {
799 callback_id,
800 result_json,
801 done,
802 });
803 }
804 }
805}
806
807impl Drop for PluginThreadHandle {
808 fn drop(&mut self) {
809 self.shutdown();
810 }
811}
812
813fn respond_to_pending(
814 pending_responses: &PendingResponses,
815 response: fresh_core::api::PluginResponse,
816) -> bool {
817 let request_id = match &response {
818 fresh_core::api::PluginResponse::VirtualBufferCreated { request_id, .. } => *request_id,
819 fresh_core::api::PluginResponse::LspRequest { request_id, .. } => *request_id,
820 fresh_core::api::PluginResponse::HighlightsComputed { request_id, .. } => *request_id,
821 fresh_core::api::PluginResponse::BufferText { request_id, .. } => *request_id,
822 fresh_core::api::PluginResponse::CompositeBufferCreated { request_id, .. } => *request_id,
823 fresh_core::api::PluginResponse::LineStartPosition { request_id, .. } => *request_id,
824 fresh_core::api::PluginResponse::LineEndPosition { request_id, .. } => *request_id,
825 fresh_core::api::PluginResponse::BufferLineCount { request_id, .. } => *request_id,
826 fresh_core::api::PluginResponse::TerminalCreated { request_id, .. } => *request_id,
827 fresh_core::api::PluginResponse::SplitByLabel { request_id, .. } => *request_id,
828 };
829
830 let sender = {
831 let mut pending = pending_responses.lock().unwrap();
832 pending.remove(&request_id)
833 };
834
835 if let Some(tx) = sender {
836 let _ = tx.send(response);
837 true
838 } else {
839 false
840 }
841}
842
843#[cfg(test)]
844mod plugin_thread_tests {
845 use super::*;
846 use fresh_core::api::PluginResponse;
847 use serde_json::json;
848 use std::collections::HashMap;
849 use std::sync::{Arc, Mutex};
850 use tokio::sync::oneshot;
851
852 #[test]
853 fn respond_to_pending_sends_lsp_response() {
854 let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
855 let (tx, mut rx) = oneshot::channel();
856 pending.lock().unwrap().insert(123, tx);
857
858 respond_to_pending(
859 &pending,
860 PluginResponse::LspRequest {
861 request_id: 123,
862 result: Ok(json!({ "key": "value" })),
863 },
864 );
865
866 let response = rx.try_recv().expect("expected response");
867 match response {
868 PluginResponse::LspRequest { result, .. } => {
869 assert_eq!(result.unwrap(), json!({ "key": "value" }));
870 }
871 _ => panic!("unexpected variant"),
872 }
873
874 assert!(pending.lock().unwrap().is_empty());
875 }
876
877 #[test]
878 fn respond_to_pending_handles_virtual_buffer_created() {
879 let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
880 let (tx, mut rx) = oneshot::channel();
881 pending.lock().unwrap().insert(456, tx);
882
883 respond_to_pending(
884 &pending,
885 PluginResponse::VirtualBufferCreated {
886 request_id: 456,
887 buffer_id: fresh_core::BufferId(7),
888 split_id: Some(fresh_core::SplitId(1)),
889 },
890 );
891
892 let response = rx.try_recv().expect("expected response");
893 match response {
894 PluginResponse::VirtualBufferCreated { buffer_id, .. } => {
895 assert_eq!(buffer_id.0, 7);
896 }
897 _ => panic!("unexpected variant"),
898 }
899
900 assert!(pending.lock().unwrap().is_empty());
901 }
902}
903
904async fn plugin_thread_loop(
910 runtime: Rc<RefCell<QuickJsBackend>>,
911 plugins: &mut HashMap<String, TsPluginInfo>,
912 mut request_receiver: tokio::sync::mpsc::UnboundedReceiver<PluginRequest>,
913) {
914 tracing::info!("Plugin thread event loop started");
915
916 let poll_interval = Duration::from_millis(1);
918 let mut has_pending_work = false;
919
920 loop {
921 if crate::backend::has_fatal_js_error() {
925 if let Some(error_msg) = crate::backend::take_fatal_js_error() {
926 tracing::error!(
927 "Fatal JS error detected, terminating plugin thread: {}",
928 error_msg
929 );
930 panic!("Fatal plugin error: {}", error_msg);
931 }
932 }
933
934 tokio::select! {
935 biased; request = request_receiver.recv() => {
938 match request {
939 Some(PluginRequest::ExecuteAction {
940 action_name,
941 response,
942 }) => {
943 let result = runtime.borrow_mut().start_action(&action_name);
946 let _ = response.send(result);
947 has_pending_work = true; }
949 Some(request) => {
950 let should_shutdown =
951 handle_request(request, Rc::clone(&runtime), plugins).await;
952
953 if should_shutdown {
954 break;
955 }
956 has_pending_work = true; }
958 None => {
959 tracing::info!("Plugin thread request channel closed");
961 break;
962 }
963 }
964 }
965
966 _ = tokio::time::sleep(poll_interval), if has_pending_work => {
968 has_pending_work = runtime.borrow_mut().poll_event_loop_once();
969 }
970 }
971 }
972}
973
974#[allow(clippy::await_holding_refcell_ref)]
993async fn run_hook_internal_rc(
994 runtime: Rc<RefCell<QuickJsBackend>>,
995 hook_name: &str,
996 args: &HookArgs,
997) -> Result<()> {
998 let json_start = std::time::Instant::now();
1001 let json_data = fresh_core::hooks::hook_args_to_json(args)?;
1002 tracing::trace!(
1003 hook = hook_name,
1004 json_us = json_start.elapsed().as_micros(),
1005 "hook args serialized"
1006 );
1007
1008 let emit_start = std::time::Instant::now();
1010 runtime.borrow_mut().emit(hook_name, &json_data).await?;
1011 tracing::trace!(
1012 hook = hook_name,
1013 emit_ms = emit_start.elapsed().as_millis(),
1014 "emit completed"
1015 );
1016
1017 Ok(())
1018}
1019
1020async fn handle_request(
1022 request: PluginRequest,
1023 runtime: Rc<RefCell<QuickJsBackend>>,
1024 plugins: &mut HashMap<String, TsPluginInfo>,
1025) -> bool {
1026 match request {
1027 PluginRequest::LoadPlugin { path, response } => {
1028 let result = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await;
1029 let _ = response.send(result);
1030 }
1031
1032 PluginRequest::LoadPluginsFromDir { dir, response } => {
1033 let errors = load_plugins_from_dir_internal(Rc::clone(&runtime), plugins, &dir).await;
1034 let _ = response.send(errors);
1035 }
1036
1037 PluginRequest::LoadPluginsFromDirWithConfig {
1038 dir,
1039 plugin_configs,
1040 response,
1041 } => {
1042 let (errors, discovered) = load_plugins_from_dir_with_config_internal(
1043 Rc::clone(&runtime),
1044 plugins,
1045 &dir,
1046 &plugin_configs,
1047 )
1048 .await;
1049 let _ = response.send((errors, discovered));
1050 }
1051
1052 PluginRequest::LoadPluginFromSource {
1053 source,
1054 name,
1055 is_typescript,
1056 response,
1057 } => {
1058 let result = load_plugin_from_source_internal(
1059 Rc::clone(&runtime),
1060 plugins,
1061 &source,
1062 &name,
1063 is_typescript,
1064 );
1065 let _ = response.send(result);
1066 }
1067
1068 PluginRequest::UnloadPlugin { name, response } => {
1069 let result = unload_plugin_internal(Rc::clone(&runtime), plugins, &name);
1070 let _ = response.send(result);
1071 }
1072
1073 PluginRequest::ReloadPlugin { name, response } => {
1074 let result = reload_plugin_internal(Rc::clone(&runtime), plugins, &name).await;
1075 let _ = response.send(result);
1076 }
1077
1078 PluginRequest::ExecuteAction {
1079 action_name,
1080 response,
1081 } => {
1082 tracing::error!(
1085 "ExecuteAction should be handled in main loop, not here: {}",
1086 action_name
1087 );
1088 let _ = response.send(Err(anyhow::anyhow!(
1089 "Internal error: ExecuteAction in wrong handler"
1090 )));
1091 }
1092
1093 PluginRequest::RunHook { hook_name, args } => {
1094 let hook_start = std::time::Instant::now();
1096 if hook_name == "prompt_confirmed" || hook_name == "prompt_cancelled" {
1098 tracing::info!(hook = %hook_name, ?args, "RunHook request received (prompt hook)");
1099 } else {
1100 tracing::trace!(hook = %hook_name, "RunHook request received");
1101 }
1102 if let Err(e) = run_hook_internal_rc(Rc::clone(&runtime), &hook_name, &args).await {
1103 let error_msg = format!("Plugin error in '{}': {}", hook_name, e);
1104 tracing::error!("{}", error_msg);
1105 runtime.borrow_mut().send_status(error_msg);
1107 }
1108 runtime.borrow().send_hook_completed(hook_name.clone());
1111 if hook_name == "prompt_confirmed" || hook_name == "prompt_cancelled" {
1112 tracing::info!(
1113 hook = %hook_name,
1114 elapsed_ms = hook_start.elapsed().as_millis(),
1115 "RunHook completed (prompt hook)"
1116 );
1117 } else {
1118 tracing::trace!(
1119 hook = %hook_name,
1120 elapsed_ms = hook_start.elapsed().as_millis(),
1121 "RunHook completed"
1122 );
1123 }
1124 }
1125
1126 PluginRequest::HasHookHandlers {
1127 hook_name,
1128 response,
1129 } => {
1130 let has_handlers = runtime.borrow().has_handlers(&hook_name);
1131 let _ = response.send(has_handlers);
1132 }
1133
1134 PluginRequest::ListPlugins { response } => {
1135 let plugin_list: Vec<TsPluginInfo> = plugins.values().cloned().collect();
1136 let _ = response.send(plugin_list);
1137 }
1138
1139 PluginRequest::ResolveCallback {
1140 callback_id,
1141 result_json,
1142 } => {
1143 tracing::info!(
1144 "ResolveCallback: resolving callback_id={} with result_json={}",
1145 callback_id,
1146 result_json
1147 );
1148 runtime
1149 .borrow_mut()
1150 .resolve_callback(callback_id, &result_json);
1151 tracing::info!(
1153 "ResolveCallback: done resolving callback_id={}",
1154 callback_id
1155 );
1156 }
1157
1158 PluginRequest::RejectCallback { callback_id, error } => {
1159 runtime.borrow_mut().reject_callback(callback_id, &error);
1160 }
1162
1163 PluginRequest::CallStreamingCallback {
1164 callback_id,
1165 result_json,
1166 done,
1167 } => {
1168 runtime
1169 .borrow_mut()
1170 .call_streaming_callback(callback_id, &result_json, done);
1171 }
1172
1173 PluginRequest::TrackAsyncResource {
1174 plugin_name,
1175 resource,
1176 } => {
1177 let rt = runtime.borrow();
1178 let mut tracked = rt.plugin_tracked_state.borrow_mut();
1179 let state = tracked.entry(plugin_name).or_default();
1180 match resource {
1181 TrackedAsyncResource::VirtualBuffer(buffer_id) => {
1182 state.virtual_buffer_ids.push(buffer_id);
1183 }
1184 TrackedAsyncResource::CompositeBuffer(buffer_id) => {
1185 state.composite_buffer_ids.push(buffer_id);
1186 }
1187 TrackedAsyncResource::Terminal(terminal_id) => {
1188 state.terminal_ids.push(terminal_id);
1189 }
1190 }
1191 }
1192
1193 PluginRequest::Shutdown => {
1194 tracing::info!("Plugin thread received shutdown request");
1195 return true;
1196 }
1197 }
1198
1199 false
1200}
1201
1202#[allow(clippy::await_holding_refcell_ref)]
1210async fn load_plugin_internal(
1211 runtime: Rc<RefCell<QuickJsBackend>>,
1212 plugins: &mut HashMap<String, TsPluginInfo>,
1213 path: &Path,
1214) -> Result<()> {
1215 let plugin_name = path
1216 .file_stem()
1217 .and_then(|s| s.to_str())
1218 .ok_or_else(|| anyhow!("Invalid plugin filename"))?
1219 .to_string();
1220
1221 tracing::info!("Loading TypeScript plugin: {} from {:?}", plugin_name, path);
1222 tracing::debug!(
1223 "load_plugin_internal: starting module load for plugin '{}'",
1224 plugin_name
1225 );
1226
1227 let path_str = path
1229 .to_str()
1230 .ok_or_else(|| anyhow!("Invalid path encoding"))?;
1231
1232 let i18n_path = path.with_extension("i18n.json");
1234 if i18n_path.exists() {
1235 if let Ok(content) = std::fs::read_to_string(&i18n_path) {
1236 if let Ok(strings) = serde_json::from_str::<
1237 std::collections::HashMap<String, std::collections::HashMap<String, String>>,
1238 >(&content)
1239 {
1240 runtime
1241 .borrow_mut()
1242 .services
1243 .register_plugin_strings(&plugin_name, strings);
1244 tracing::debug!("Loaded i18n strings for plugin '{}'", plugin_name);
1245 }
1246 }
1247 }
1248
1249 let load_start = std::time::Instant::now();
1250 runtime
1251 .borrow_mut()
1252 .load_module_with_source(path_str, &plugin_name)
1253 .await?;
1254 let load_elapsed = load_start.elapsed();
1255
1256 tracing::debug!(
1257 "load_plugin_internal: plugin '{}' loaded successfully in {:?}",
1258 plugin_name,
1259 load_elapsed
1260 );
1261
1262 plugins.insert(
1264 plugin_name.clone(),
1265 TsPluginInfo {
1266 name: plugin_name.clone(),
1267 path: path.to_path_buf(),
1268 enabled: true,
1269 },
1270 );
1271
1272 tracing::debug!(
1273 "load_plugin_internal: plugin '{}' registered, total plugins loaded: {}",
1274 plugin_name,
1275 plugins.len()
1276 );
1277
1278 Ok(())
1279}
1280
1281async fn load_plugins_from_dir_internal(
1283 runtime: Rc<RefCell<QuickJsBackend>>,
1284 plugins: &mut HashMap<String, TsPluginInfo>,
1285 dir: &Path,
1286) -> Vec<String> {
1287 tracing::debug!(
1288 "load_plugins_from_dir_internal: scanning directory {:?}",
1289 dir
1290 );
1291 let mut errors = Vec::new();
1292
1293 if !dir.exists() {
1294 tracing::warn!("Plugin directory does not exist: {:?}", dir);
1295 return errors;
1296 }
1297
1298 match std::fs::read_dir(dir) {
1300 Ok(entries) => {
1301 for entry in entries.flatten() {
1302 let path = entry.path();
1303 let ext = path.extension().and_then(|s| s.to_str());
1304 if ext == Some("ts") || ext == Some("js") {
1305 tracing::debug!(
1306 "load_plugins_from_dir_internal: attempting to load {:?}",
1307 path
1308 );
1309 if let Err(e) = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await
1310 {
1311 let err = format!("Failed to load {:?}: {}", path, e);
1312 tracing::error!("{}", err);
1313 errors.push(err);
1314 }
1315 }
1316 }
1317
1318 tracing::debug!(
1319 "load_plugins_from_dir_internal: finished loading from {:?}, {} errors",
1320 dir,
1321 errors.len()
1322 );
1323 }
1324 Err(e) => {
1325 let err = format!("Failed to read plugin directory: {}", e);
1326 tracing::error!("{}", err);
1327 errors.push(err);
1328 }
1329 }
1330
1331 errors
1332}
1333
1334async fn load_plugins_from_dir_with_config_internal(
1338 runtime: Rc<RefCell<QuickJsBackend>>,
1339 plugins: &mut HashMap<String, TsPluginInfo>,
1340 dir: &Path,
1341 plugin_configs: &HashMap<String, PluginConfig>,
1342) -> (Vec<String>, HashMap<String, PluginConfig>) {
1343 tracing::debug!(
1344 "load_plugins_from_dir_with_config_internal: scanning directory {:?}",
1345 dir
1346 );
1347 let mut errors = Vec::new();
1348 let mut discovered_plugins: HashMap<String, PluginConfig> = HashMap::new();
1349
1350 if !dir.exists() {
1351 tracing::warn!("Plugin directory does not exist: {:?}", dir);
1352 return (errors, discovered_plugins);
1353 }
1354
1355 let mut plugin_files: Vec<(String, std::path::PathBuf)> = Vec::new();
1357 match std::fs::read_dir(dir) {
1358 Ok(entries) => {
1359 for entry in entries.flatten() {
1360 let path = entry.path();
1361 let ext = path.extension().and_then(|s| s.to_str());
1362 if ext == Some("ts") || ext == Some("js") {
1363 if path.to_string_lossy().contains(".i18n.") {
1365 continue;
1366 }
1367 let plugin_name = path
1369 .file_stem()
1370 .and_then(|s| s.to_str())
1371 .unwrap_or("unknown")
1372 .to_string();
1373 plugin_files.push((plugin_name, path));
1374 }
1375 }
1376 }
1377 Err(e) => {
1378 let err = format!("Failed to read plugin directory: {}", e);
1379 tracing::error!("{}", err);
1380 errors.push(err);
1381 return (errors, discovered_plugins);
1382 }
1383 }
1384
1385 for (plugin_name, path) in plugin_files {
1387 let config = if let Some(existing_config) = plugin_configs.get(&plugin_name) {
1389 PluginConfig {
1391 enabled: existing_config.enabled,
1392 path: Some(path.clone()),
1393 }
1394 } else {
1395 PluginConfig::new_with_path(path.clone())
1397 };
1398
1399 discovered_plugins.insert(plugin_name.clone(), config.clone());
1401
1402 if config.enabled {
1404 tracing::debug!(
1405 "load_plugins_from_dir_with_config_internal: loading enabled plugin '{}'",
1406 plugin_name
1407 );
1408 if let Err(e) = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await {
1409 let err = format!("Failed to load {:?}: {}", path, e);
1410 tracing::error!("{}", err);
1411 errors.push(err);
1412 }
1413 } else {
1414 tracing::info!(
1415 "load_plugins_from_dir_with_config_internal: skipping disabled plugin '{}'",
1416 plugin_name
1417 );
1418 }
1419 }
1420
1421 tracing::debug!(
1422 "load_plugins_from_dir_with_config_internal: finished. Discovered {} plugins, {} errors",
1423 discovered_plugins.len(),
1424 errors.len()
1425 );
1426
1427 (errors, discovered_plugins)
1428}
1429
1430fn load_plugin_from_source_internal(
1435 runtime: Rc<RefCell<QuickJsBackend>>,
1436 plugins: &mut HashMap<String, TsPluginInfo>,
1437 source: &str,
1438 name: &str,
1439 is_typescript: bool,
1440) -> Result<()> {
1441 if plugins.contains_key(name) {
1443 tracing::info!(
1444 "Hot-reloading buffer plugin '{}' — unloading previous version",
1445 name
1446 );
1447 unload_plugin_internal(Rc::clone(&runtime), plugins, name)?;
1448 }
1449
1450 tracing::info!("Loading plugin from source: {}", name);
1451
1452 runtime
1453 .borrow_mut()
1454 .execute_source(source, name, is_typescript)?;
1455
1456 plugins.insert(
1458 name.to_string(),
1459 TsPluginInfo {
1460 name: name.to_string(),
1461 path: PathBuf::from(format!("<buffer:{}>", name)),
1462 enabled: true,
1463 },
1464 );
1465
1466 tracing::info!(
1467 "Buffer plugin '{}' loaded successfully, total plugins: {}",
1468 name,
1469 plugins.len()
1470 );
1471
1472 Ok(())
1473}
1474
1475fn unload_plugin_internal(
1477 runtime: Rc<RefCell<QuickJsBackend>>,
1478 plugins: &mut HashMap<String, TsPluginInfo>,
1479 name: &str,
1480) -> Result<()> {
1481 if plugins.remove(name).is_some() {
1482 tracing::info!("Unloading TypeScript plugin: {}", name);
1483
1484 runtime
1486 .borrow_mut()
1487 .services
1488 .unregister_plugin_strings(name);
1489
1490 runtime
1492 .borrow()
1493 .services
1494 .unregister_commands_by_plugin(name);
1495
1496 runtime.borrow().cleanup_plugin(name);
1498
1499 Ok(())
1500 } else {
1501 Err(anyhow!("Plugin '{}' not found", name))
1502 }
1503}
1504
1505async fn reload_plugin_internal(
1507 runtime: Rc<RefCell<QuickJsBackend>>,
1508 plugins: &mut HashMap<String, TsPluginInfo>,
1509 name: &str,
1510) -> Result<()> {
1511 let path = plugins
1512 .get(name)
1513 .ok_or_else(|| anyhow!("Plugin '{}' not found", name))?
1514 .path
1515 .clone();
1516
1517 unload_plugin_internal(Rc::clone(&runtime), plugins, name)?;
1518 load_plugin_internal(runtime, plugins, &path).await?;
1519
1520 Ok(())
1521}
1522
1523#[cfg(test)]
1524mod tests {
1525 use super::*;
1526 use fresh_core::hooks::hook_args_to_json;
1527
1528 #[test]
1529 fn test_oneshot_channel() {
1530 let (tx, rx) = oneshot::channel::<i32>();
1531 assert!(tx.send(42).is_ok());
1532 assert_eq!(rx.recv().unwrap(), 42);
1533 }
1534
1535 #[test]
1536 fn test_hook_args_to_json_editor_initialized() {
1537 let args = HookArgs::EditorInitialized;
1538 let json = hook_args_to_json(&args).unwrap();
1539 assert_eq!(json, serde_json::json!({}));
1540 }
1541
1542 #[test]
1543 fn test_hook_args_to_json_prompt_changed() {
1544 let args = HookArgs::PromptChanged {
1545 prompt_type: "search".to_string(),
1546 input: "test".to_string(),
1547 };
1548 let json = hook_args_to_json(&args).unwrap();
1549 assert_eq!(json["prompt_type"], "search");
1550 assert_eq!(json["input"], "test");
1551 }
1552}