use crate::error::{PluginManagerError, PluginManagerResult};
use crate::manager::loader::load_plugin;
use crate::manager::load_ops::insert_child_plugin;
use crate::manager::load_ops::insert_plugin_instance;
use crate::manager::types::PluginMap;
use crate::manager::unload_ops::perform_unload;
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration, Instant};
use tracing::{debug, error, info, trace, warn};
use tracing_shared::SharedLogger;
pub async fn watch_plugin_directory(
plugins: PluginMap,
plugin_dir: &str,
temp_dir: &str,
ed25519_public_key_path: Option<String>,
rsa_private_key_path: Option<String>,
library_path: Option<&String>,
_timeout_secs: u64,
mut stop_rx: mpsc::Receiver<()>,
) -> PluginManagerResult<()> {
trace!("开始监听插件目录: {}", plugin_dir);
info!("开始监听插件目录: {}", plugin_dir);
let plugin_dir_path = Path::new(plugin_dir);
if !plugin_dir_path.exists() {
error!("插件目录不存在: {}", plugin_dir);
return Err(PluginManagerError::ConfigError(format!(
"插件目录不存在: {}",
plugin_dir
)));
}
debug!("插件目录存在: {}", plugin_dir);
let (tx, mut rx) = mpsc::channel::<Result<Event, notify::Error>>(100);
trace!("文件系统事件通道已创建");
let mut watcher = RecommendedWatcher::new(
move |res| {
if let Err(_) = tx.blocking_send(res) {
warn!("文件系统事件通道已关闭");
}
},
Config::default(),
)
.map_err(|e| {
error!("创建文件系统监听器失败: {}", e);
PluginManagerError::Io(std::io::Error::new(
std::io::ErrorKind::Other,
format!("创建文件系统监听器失败: {}", e),
))
})?;
debug!("文件系统监听器创建成功");
watcher
.watch(plugin_dir_path, RecursiveMode::NonRecursive)
.map_err(|e| {
error!("开始监听目录失败: {} - {}", plugin_dir, e);
PluginManagerError::Io(std::io::Error::new(
std::io::ErrorKind::Other,
format!("开始监听目录失败: {}", e),
))
})?;
info!("插件目录监听已启动: {}", plugin_dir);
debug!("监听器已开始工作");
let mut pending_events: HashMap<PathBuf, (EventKind, Instant)> = HashMap::new();
let debounce_duration = Duration::from_millis(500); debug!("防抖机制已启用,延迟: {}ms", debounce_duration.as_millis());
loop {
tokio::select! {
_ = stop_rx.recv() => {
info!("收到停止信号,停止监听插件目录");
trace!("监听循环退出");
break;
}
event_result = rx.recv() => {
match event_result {
Some(Ok(event)) => {
trace!("收到文件系统事件: {:?}", event.kind);
for path in &event.paths {
if let Some(extension) = path.extension() {
if extension == "spk" {
trace!("检测到 .spk 文件事件: {} - {:?}", path.display(), event.kind);
pending_events.insert(path.clone(), (event.kind.clone(), Instant::now()));
}
}
}
}
Some(Err(e)) => {
error!("文件系统监听错误: {}", e);
}
None => {
warn!("文件系统事件通道已关闭");
break;
}
}
}
}
let now = Instant::now();
let mut events_to_process: Vec<(PathBuf, EventKind)> = Vec::new();
pending_events.retain(|path, (kind, timestamp)| {
if now.duration_since(*timestamp) >= debounce_duration {
events_to_process.push((path.clone(), kind.clone()));
false } else {
true }
});
if !events_to_process.is_empty() {
debug!("处理 {} 个防抖后的事件", events_to_process.len());
}
for (path, kind) in events_to_process {
trace!("处理文件系统事件: {} - {:?}", path.display(), kind);
let plugins_clone = plugins.clone();
let plugin_dir_str = plugin_dir.to_string();
let temp_dir_str = temp_dir.to_string();
let ed25519_key = ed25519_public_key_path.clone();
let rsa_key = rsa_private_key_path.clone();
let library_path_clone = library_path.map(|s| s.clone());
tokio::spawn(async move {
if let Err(e) = handle_file_system_event(
&plugins_clone,
&plugin_dir_str,
&temp_dir_str,
ed25519_key.as_ref(),
rsa_key.as_ref(),
library_path_clone.as_ref(),
&path,
&kind,
).await {
error!("处理文件系统事件失败: {} - {}", path.display(), e);
}
});
}
sleep(Duration::from_millis(100)).await;
}
info!("插件目录监听已停止");
trace!("监听任务完成");
Ok(())
}
async fn handle_file_system_event(
plugins: &PluginMap,
plugin_dir: &str,
temp_dir: &str,
ed25519_public_key_path: Option<&String>,
rsa_private_key_path: Option<&String>,
library_path: Option<&String>,
path: &Path,
kind: &EventKind,
) -> PluginManagerResult<()> {
trace!("处理文件系统事件: {} - {:?}", path.display(), kind);
match kind {
EventKind::Create(_) => {
info!("检测到新插件文件: {}", path.display());
debug!("触发插件创建处理流程");
handle_plugin_create(
plugins,
path,
plugin_dir,
temp_dir,
ed25519_public_key_path,
rsa_private_key_path,
library_path,
)
.await
}
EventKind::Remove(_) => {
info!("检测到插件文件删除: {}", path.display());
debug!("触发插件删除处理流程");
handle_plugin_remove(plugins, path).await
}
EventKind::Modify(_) => {
info!("检测到插件文件修改: {}", path.display());
debug!("触发插件修改处理流程(先删除再创建)");
handle_plugin_remove(plugins, path).await?;
handle_plugin_create(
plugins,
path,
plugin_dir,
temp_dir,
ed25519_public_key_path,
rsa_private_key_path,
library_path,
)
.await
}
_ => {
trace!("忽略事件类型: {:?}", kind);
Ok(())
}
}
}
async fn handle_plugin_create(
plugins: &PluginMap,
plugin_path: &Path,
plugin_dir: &str,
temp_dir: &str,
ed25519_public_key_path: Option<&String>,
rsa_private_key_path: Option<&String>,
library_path: Option<&String>,
) -> PluginManagerResult<()> {
trace!("处理插件创建事件: {}", plugin_path.display());
let plugin_file = plugin_path
.to_str()
.ok_or_else(|| {
error!("插件文件路径无效: {}", plugin_path.display());
PluginManagerError::LoadFailed("插件文件路径无效".to_string())
})?;
if !plugin_path.exists() {
warn!("插件文件不存在,跳过加载: {}", plugin_file);
return Ok(());
}
debug!("插件文件存在,等待文件写入完成");
sleep(Duration::from_millis(100)).await;
if !plugin_path.exists() {
warn!("插件文件在等待后仍不存在,跳过加载: {}", plugin_file);
return Ok(());
}
trace!("文件写入完成,开始加载插件");
let logger = SharedLogger::new();
match load_plugin(
plugin_file,
ed25519_public_key_path.cloned(),
rsa_private_key_path.cloned(),
temp_dir,
library_path,
logger,
)
.await
{
Ok(instance) => {
let plugin_id = instance.metadata.id.clone();
debug!("插件加载成功: {} (ID: {})", instance.metadata.name, plugin_id);
{
let plugins_guard = plugins.read().await;
if plugins_guard.contains_key(&plugin_id) {
warn!("插件 {} 已存在,跳过加载", plugin_id);
return Ok(());
}
}
if instance.metadata.is_sub_plugin {
debug!("这是子插件,挂载到父插件");
if let Err(e) = insert_child_plugin(plugins.clone(), instance).await {
error!("挂载子插件失败: {}", e);
return Err(e);
}
} else {
debug!("这是主插件,直接插入");
if let Err(e) = insert_plugin_instance(plugins.clone(), instance).await {
error!("插入插件实例失败: {}", e);
return Err(e);
}
}
info!("插件自动加载成功: {}", plugin_file);
trace!("插件创建处理完成");
Ok(())
}
Err(e) => {
error!("插件自动加载失败: {} - {}", plugin_file, e);
Err(e)
}
}
}
async fn handle_plugin_remove(
plugins: &PluginMap,
plugin_path: &Path,
) -> PluginManagerResult<()> {
trace!("处理插件删除事件: {}", plugin_path.display());
let file_name = plugin_path
.file_stem()
.and_then(|s| s.to_str())
.ok_or_else(|| {
error!("无法从文件路径获取插件名称: {}", plugin_path.display());
PluginManagerError::NotFound("无法从文件路径获取插件名称".to_string())
})?;
debug!("从文件名推断插件: {}", file_name);
let plugin_id = {
let plugins_guard = plugins.read().await;
plugins_guard
.iter()
.find_map(|(id, instance)| {
let plugin_name = &instance.metadata.name;
if file_name.contains(id) || file_name.contains(plugin_name) {
trace!("找到匹配的插件: {} (ID: {})", plugin_name, id);
Some(id.clone())
} else {
None
}
})
};
if let Some(plugin_id) = plugin_id {
info!("找到匹配的插件 {},执行卸载", plugin_id);
debug!("开始卸载插件: {}", plugin_id);
perform_unload(plugins.clone(), &plugin_id).await?;
trace!("插件删除处理完成");
Ok(())
} else {
warn!(
"未找到与文件 {} 对应的插件,跳过卸载",
plugin_path.display()
);
debug!("文件名: {}", file_name);
Ok(())
}
}