use crate::input::command_registry::CommandRegistry;
use crate::services::plugins::api::{EditorStateSnapshot, PluginCommand};
use crate::services::plugins::hooks::{hook_args_to_json, HookArgs};
use crate::services::plugins::runtime::{TsPluginInfo, TypeScriptRuntime};
use anyhow::{anyhow, Result};
use std::cell::RefCell;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
#[derive(Debug)]
pub enum PluginRequest {
LoadPlugin {
path: PathBuf,
response: oneshot::Sender<Result<()>>,
},
LoadPluginsFromDir {
dir: PathBuf,
response: oneshot::Sender<Vec<String>>,
},
UnloadPlugin {
name: String,
response: oneshot::Sender<Result<()>>,
},
ReloadPlugin {
name: String,
response: oneshot::Sender<Result<()>>,
},
ExecuteAction {
action_name: String,
response: oneshot::Sender<Result<()>>,
},
RunHook { hook_name: String, args: HookArgs },
HasHookHandlers {
hook_name: String,
response: oneshot::Sender<bool>,
},
ListPlugins {
response: oneshot::Sender<Vec<TsPluginInfo>>,
},
Shutdown,
}
pub mod oneshot {
use std::fmt;
use std::sync::mpsc;
pub struct Sender<T>(mpsc::SyncSender<T>);
pub struct Receiver<T>(mpsc::Receiver<T>);
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Sender").finish()
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Receiver").finish()
}
}
impl<T> Sender<T> {
pub fn send(self, value: T) -> Result<(), T> {
self.0.send(value).map_err(|e| e.0)
}
}
impl<T> Receiver<T> {
pub fn recv(self) -> Result<T, mpsc::RecvError> {
self.0.recv()
}
pub fn recv_timeout(
self,
timeout: std::time::Duration,
) -> Result<T, mpsc::RecvTimeoutError> {
self.0.recv_timeout(timeout)
}
pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
self.0.try_recv()
}
}
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let (tx, rx) = mpsc::sync_channel(1);
(Sender(tx), Receiver(rx))
}
}
pub struct PluginThreadHandle {
request_sender: tokio::sync::mpsc::UnboundedSender<PluginRequest>,
thread_handle: Option<JoinHandle<()>>,
state_snapshot: Arc<RwLock<EditorStateSnapshot>>,
commands: Arc<RwLock<CommandRegistry>>,
pending_responses: crate::services::plugins::runtime::PendingResponses,
command_receiver: std::sync::mpsc::Receiver<PluginCommand>,
}
impl PluginThreadHandle {
pub fn spawn(commands: Arc<RwLock<CommandRegistry>>) -> Result<Self> {
tracing::debug!("PluginThreadHandle::spawn: starting plugin thread creation");
let (command_sender, command_receiver) = std::sync::mpsc::channel();
let state_snapshot = Arc::new(RwLock::new(EditorStateSnapshot::new()));
let pending_responses: crate::services::plugins::runtime::PendingResponses =
Arc::new(std::sync::Mutex::new(std::collections::HashMap::new()));
let thread_pending_responses = Arc::clone(&pending_responses);
let (request_sender, request_receiver) = tokio::sync::mpsc::unbounded_channel();
let thread_state_snapshot = Arc::clone(&state_snapshot);
let thread_commands = Arc::clone(&commands);
tracing::debug!("PluginThreadHandle::spawn: spawning OS thread for plugin runtime");
let thread_handle = thread::spawn(move || {
tracing::debug!("Plugin thread: OS thread started, creating tokio runtime");
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => {
tracing::debug!("Plugin thread: tokio runtime created successfully");
rt
}
Err(e) => {
tracing::error!("Failed to create plugin thread runtime: {}", e);
return;
}
};
tracing::debug!("Plugin thread: creating TypeScript runtime (V8 initialization)");
let runtime = match TypeScriptRuntime::with_state_and_responses(
Arc::clone(&thread_state_snapshot),
command_sender,
thread_pending_responses,
) {
Ok(rt) => {
tracing::debug!("Plugin thread: TypeScript runtime created successfully");
rt
}
Err(e) => {
tracing::error!("Failed to create TypeScript runtime: {}", e);
return;
}
};
let mut plugins: HashMap<String, TsPluginInfo> = HashMap::new();
tracing::debug!("Plugin thread: starting event loop with LocalSet");
let local = tokio::task::LocalSet::new();
local.block_on(&rt, async {
let runtime = Rc::new(RefCell::new(runtime));
tracing::debug!("Plugin thread: entering plugin_thread_loop");
plugin_thread_loop(runtime, &mut plugins, &thread_commands, request_receiver).await;
});
tracing::info!("Plugin thread shutting down");
});
tracing::debug!("PluginThreadHandle::spawn: OS thread spawned, returning handle");
tracing::info!("Plugin thread spawned");
Ok(Self {
request_sender,
thread_handle: Some(thread_handle),
state_snapshot,
commands,
pending_responses,
command_receiver,
})
}
pub fn deliver_response(&self, response: crate::services::plugins::api::PluginResponse) {
respond_to_pending(&self.pending_responses, response);
}
pub fn load_plugin(&self, path: &Path) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.request_sender
.send(PluginRequest::LoadPlugin {
path: path.to_path_buf(),
response: tx,
})
.map_err(|_| anyhow!("Plugin thread not responding"))?;
rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
}
pub fn load_plugins_from_dir(&self, dir: &Path) -> Vec<String> {
let (tx, rx) = oneshot::channel();
if self
.request_sender
.send(PluginRequest::LoadPluginsFromDir {
dir: dir.to_path_buf(),
response: tx,
})
.is_err()
{
return vec!["Plugin thread not responding".to_string()];
}
rx.recv()
.unwrap_or_else(|_| vec!["Plugin thread closed".to_string()])
}
pub fn unload_plugin(&self, name: &str) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.request_sender
.send(PluginRequest::UnloadPlugin {
name: name.to_string(),
response: tx,
})
.map_err(|_| anyhow!("Plugin thread not responding"))?;
rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
}
pub fn reload_plugin(&self, name: &str) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.request_sender
.send(PluginRequest::ReloadPlugin {
name: name.to_string(),
response: tx,
})
.map_err(|_| anyhow!("Plugin thread not responding"))?;
rx.recv().map_err(|_| anyhow!("Plugin thread closed"))?
}
pub fn execute_action_async(&self, action_name: &str) -> Result<oneshot::Receiver<Result<()>>> {
tracing::trace!("execute_action_async: starting action '{}'", action_name);
let (tx, rx) = oneshot::channel();
self.request_sender
.send(PluginRequest::ExecuteAction {
action_name: action_name.to_string(),
response: tx,
})
.map_err(|_| anyhow!("Plugin thread not responding"))?;
tracing::trace!("execute_action_async: request sent for '{}'", action_name);
Ok(rx)
}
pub fn run_hook(&self, hook_name: &str, args: HookArgs) {
let _ = self.request_sender.send(PluginRequest::RunHook {
hook_name: hook_name.to_string(),
args,
});
}
pub fn has_hook_handlers(&self, hook_name: &str) -> bool {
let (tx, rx) = oneshot::channel();
if self
.request_sender
.send(PluginRequest::HasHookHandlers {
hook_name: hook_name.to_string(),
response: tx,
})
.is_err()
{
return false;
}
rx.recv().unwrap_or(false)
}
pub fn list_plugins(&self) -> Vec<TsPluginInfo> {
let (tx, rx) = oneshot::channel();
if self
.request_sender
.send(PluginRequest::ListPlugins { response: tx })
.is_err()
{
return vec![];
}
rx.recv().unwrap_or_default()
}
pub fn process_commands(&mut self) -> Vec<PluginCommand> {
let mut commands = Vec::new();
while let Ok(cmd) = self.command_receiver.try_recv() {
commands.push(cmd);
}
commands
}
pub fn state_snapshot_handle(&self) -> Arc<RwLock<EditorStateSnapshot>> {
Arc::clone(&self.state_snapshot)
}
#[allow(dead_code)]
pub fn command_registry(&self) -> Arc<RwLock<CommandRegistry>> {
Arc::clone(&self.commands)
}
pub fn shutdown(&mut self) {
let _ = self.request_sender.send(PluginRequest::Shutdown);
if let Some(handle) = self.thread_handle.take() {
let _ = handle.join();
}
}
}
impl Drop for PluginThreadHandle {
fn drop(&mut self) {
self.shutdown();
}
}
fn respond_to_pending(
pending_responses: &crate::services::plugins::runtime::PendingResponses,
response: crate::services::plugins::api::PluginResponse,
) {
let request_id = match &response {
crate::services::plugins::api::PluginResponse::VirtualBufferCreated {
request_id, ..
} => *request_id,
crate::services::plugins::api::PluginResponse::LspRequest { request_id, .. } => *request_id,
};
let sender = {
let mut pending = pending_responses.lock().unwrap();
pending.remove(&request_id)
};
if let Some(tx) = sender {
let _ = tx.send(response);
}
}
#[cfg(test)]
mod plugin_thread_tests {
use super::*;
use crate::services::plugins::api::PluginResponse;
use crate::services::plugins::runtime::PendingResponses;
use serde_json::json;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::oneshot;
#[test]
fn respond_to_pending_sends_lsp_response() {
let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
let (tx, mut rx) = oneshot::channel();
pending.lock().unwrap().insert(123, tx);
respond_to_pending(
&pending,
PluginResponse::LspRequest {
request_id: 123,
result: Ok(json!({ "key": "value" })),
},
);
let response = rx.try_recv().expect("expected response");
match response {
PluginResponse::LspRequest { result, .. } => {
assert_eq!(result.unwrap(), json!({ "key": "value" }));
}
_ => panic!("unexpected variant"),
}
assert!(pending.lock().unwrap().is_empty());
}
#[test]
fn respond_to_pending_handles_virtual_buffer_created() {
let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
let (tx, mut rx) = oneshot::channel();
pending.lock().unwrap().insert(456, tx);
respond_to_pending(
&pending,
PluginResponse::VirtualBufferCreated {
request_id: 456,
buffer_id: crate::model::event::BufferId(7),
split_id: Some(crate::model::event::SplitId(1)),
},
);
let response = rx.try_recv().expect("expected response");
match response {
PluginResponse::VirtualBufferCreated { buffer_id, .. } => {
assert_eq!(buffer_id.0, 7);
}
_ => panic!("unexpected variant"),
}
assert!(pending.lock().unwrap().is_empty());
}
}
async fn plugin_thread_loop(
runtime: Rc<RefCell<TypeScriptRuntime>>,
plugins: &mut HashMap<String, TsPluginInfo>,
commands: &Arc<RwLock<CommandRegistry>>,
mut request_receiver: tokio::sync::mpsc::UnboundedReceiver<PluginRequest>,
) {
tracing::info!("Plugin thread event loop started");
loop {
match request_receiver.recv().await {
Some(PluginRequest::ExecuteAction {
action_name,
response,
}) => {
execute_action_with_hooks(&action_name, response, Rc::clone(&runtime)).await;
}
Some(request) => {
let should_shutdown =
handle_request(request, Rc::clone(&runtime), plugins, commands).await;
if should_shutdown {
break;
}
}
None => {
tracing::info!("Plugin thread request channel closed");
break;
}
}
}
}
async fn execute_action_with_hooks(
action_name: &str,
response: oneshot::Sender<Result<()>>,
runtime: Rc<RefCell<TypeScriptRuntime>>,
) {
tracing::trace!(
"execute_action_with_hooks: starting action '{}'",
action_name
);
let result = runtime.borrow_mut().execute_action(action_name).await;
tracing::trace!(
"execute_action_with_hooks: action '{}' completed with result: {:?}",
action_name,
result.is_ok()
);
let _ = response.send(result);
}
async fn run_hook_internal_rc(
runtime: Rc<RefCell<TypeScriptRuntime>>,
hook_name: &str,
args: &HookArgs,
) -> Result<()> {
let json_data = hook_args_to_json(args)?;
runtime.borrow_mut().emit(hook_name, &json_data).await?;
Ok(())
}
async fn handle_request(
request: PluginRequest,
runtime: Rc<RefCell<TypeScriptRuntime>>,
plugins: &mut HashMap<String, TsPluginInfo>,
commands: &Arc<RwLock<CommandRegistry>>,
) -> bool {
match request {
PluginRequest::LoadPlugin { path, response } => {
let result = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await;
let _ = response.send(result);
}
PluginRequest::LoadPluginsFromDir { dir, response } => {
let errors = load_plugins_from_dir_internal(Rc::clone(&runtime), plugins, &dir).await;
let _ = response.send(errors);
}
PluginRequest::UnloadPlugin { name, response } => {
let result = unload_plugin_internal(plugins, commands, &name);
let _ = response.send(result);
}
PluginRequest::ReloadPlugin { name, response } => {
let result =
reload_plugin_internal(Rc::clone(&runtime), plugins, commands, &name).await;
let _ = response.send(result);
}
PluginRequest::ExecuteAction {
action_name,
response,
} => {
tracing::error!(
"ExecuteAction should be handled in main loop, not here: {}",
action_name
);
let _ = response.send(Err(anyhow::anyhow!(
"Internal error: ExecuteAction in wrong handler"
)));
}
PluginRequest::RunHook { hook_name, args } => {
if let Err(e) = run_hook_internal_rc(Rc::clone(&runtime), &hook_name, &args).await {
let error_msg = format!("Plugin error in '{}': {}", hook_name, e);
tracing::error!("{}", error_msg);
runtime.borrow_mut().send_status(error_msg);
}
}
PluginRequest::HasHookHandlers {
hook_name,
response,
} => {
let has_handlers = runtime.borrow().has_handlers(&hook_name);
let _ = response.send(has_handlers);
}
PluginRequest::ListPlugins { response } => {
let plugin_list: Vec<TsPluginInfo> = plugins.values().cloned().collect();
let _ = response.send(plugin_list);
}
PluginRequest::Shutdown => {
tracing::info!("Plugin thread received shutdown request");
return true;
}
}
false
}
async fn load_plugin_internal(
runtime: Rc<RefCell<TypeScriptRuntime>>,
plugins: &mut HashMap<String, TsPluginInfo>,
path: &Path,
) -> Result<()> {
let plugin_name = path
.file_stem()
.and_then(|s| s.to_str())
.ok_or_else(|| anyhow!("Invalid plugin filename"))?
.to_string();
tracing::info!("Loading TypeScript plugin: {} from {:?}", plugin_name, path);
tracing::debug!(
"load_plugin_internal: starting module load for plugin '{}'",
plugin_name
);
let path_str = path
.to_str()
.ok_or_else(|| anyhow!("Invalid path encoding"))?;
let load_start = std::time::Instant::now();
runtime
.borrow_mut()
.load_module_with_source(path_str, &plugin_name)
.await?;
let load_elapsed = load_start.elapsed();
tracing::debug!(
"load_plugin_internal: plugin '{}' loaded successfully in {:?}",
plugin_name,
load_elapsed
);
plugins.insert(
plugin_name.clone(),
TsPluginInfo {
name: plugin_name.clone(),
path: path.to_path_buf(),
enabled: true,
},
);
tracing::debug!(
"load_plugin_internal: plugin '{}' registered, total plugins loaded: {}",
plugin_name,
plugins.len()
);
Ok(())
}
async fn load_plugins_from_dir_internal(
runtime: Rc<RefCell<TypeScriptRuntime>>,
plugins: &mut HashMap<String, TsPluginInfo>,
dir: &Path,
) -> Vec<String> {
tracing::debug!(
"load_plugins_from_dir_internal: scanning directory {:?}",
dir
);
let mut errors = Vec::new();
if !dir.exists() {
tracing::warn!("Plugin directory does not exist: {:?}", dir);
return errors;
}
match std::fs::read_dir(dir) {
Ok(entries) => {
for entry in entries.flatten() {
let path = entry.path();
let ext = path.extension().and_then(|s| s.to_str());
if ext == Some("ts") || ext == Some("js") {
tracing::debug!(
"load_plugins_from_dir_internal: attempting to load {:?}",
path
);
if let Err(e) = load_plugin_internal(Rc::clone(&runtime), plugins, &path).await
{
let err = format!("Failed to load {:?}: {}", path, e);
tracing::error!("{}", err);
errors.push(err);
}
}
}
tracing::debug!(
"load_plugins_from_dir_internal: finished loading from {:?}, {} errors",
dir,
errors.len()
);
}
Err(e) => {
let err = format!("Failed to read plugin directory: {}", e);
tracing::error!("{}", err);
errors.push(err);
}
}
errors
}
fn unload_plugin_internal(
plugins: &mut HashMap<String, TsPluginInfo>,
commands: &Arc<RwLock<CommandRegistry>>,
name: &str,
) -> Result<()> {
if plugins.remove(name).is_some() {
tracing::info!("Unloading TypeScript plugin: {}", name);
let prefix = format!("{}:", name);
commands.read().unwrap().unregister_by_prefix(&prefix);
Ok(())
} else {
Err(anyhow!("Plugin '{}' not found", name))
}
}
async fn reload_plugin_internal(
runtime: Rc<RefCell<TypeScriptRuntime>>,
plugins: &mut HashMap<String, TsPluginInfo>,
commands: &Arc<RwLock<CommandRegistry>>,
name: &str,
) -> Result<()> {
let path = plugins
.get(name)
.ok_or_else(|| anyhow!("Plugin '{}' not found", name))?
.path
.clone();
unload_plugin_internal(plugins, commands, name)?;
load_plugin_internal(runtime, plugins, &path).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_oneshot_channel() {
let (tx, rx) = oneshot::channel::<i32>();
assert!(tx.send(42).is_ok());
assert_eq!(rx.recv().unwrap(), 42);
}
#[test]
fn test_hook_args_to_json_editor_initialized() {
let args = HookArgs::EditorInitialized;
let json = hook_args_to_json(&args).unwrap();
assert_eq!(json, "{}");
}
#[test]
fn test_hook_args_to_json_prompt_changed() {
let args = HookArgs::PromptChanged {
prompt_type: "search".to_string(),
input: "test".to_string(),
};
let json = hook_args_to_json(&args).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["prompt_type"], "search");
assert_eq!(parsed["input"], "test");
}
}