use crate::message::Message;
use crate::node::{NodeContext, NodeErr, NodeError, NodeOut};
use crate::watcher::WatchedType;
use async_trait::async_trait;
use dashmap::DashMap;
use serde_json::Value;
use std::fmt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
#[derive(Clone, Debug)] pub struct ProcessInstance {
pub name: String,
pub wasm_path: PathBuf,
}
#[derive(Clone, Debug)] pub struct ProcessWrapper {
instance: ProcessInstance,
description: String,
parameters: Value,
}
impl ProcessWrapper {
pub fn new(instance: ProcessInstance, description: String, parameters: Value) -> Self {
Self {
instance,
description,
parameters,
}
}
pub fn name(&self) -> &str {
&self.instance.name
}
pub fn description(&self) -> String {
self.description.clone()
}
pub fn instance(&self) -> ProcessInstance {
self.instance.clone()
}
pub fn parameters(&self) -> Value {
self.parameters.clone()
}
pub async fn process(&self, _msg: Message, _ctx: &mut NodeContext) -> Result<NodeOut, NodeErr> {
return Err(NodeErr::fail(NodeError::ExecutionFailed(
"Not yet implemented".to_string(),
)));
}
}
#[derive(Clone)]
pub struct ProcessWatcher {
processes: Arc<DashMap<String, Box<ProcessWrapper>>>,
path_to_name: Arc<DashMap<PathBuf, String>>,
}
impl ProcessWatcher {
pub fn new() -> Self {
ProcessWatcher {
processes: Arc::new(DashMap::new()),
path_to_name: Arc::new(DashMap::new()),
}
}
pub fn loaded_processes(&self) -> Arc<DashMap<String, Box<ProcessWrapper>>> {
Arc::clone(&self.processes)
}
pub fn get_process(&self, name: &str) -> Option<Box<ProcessWrapper>> {
self.processes.get(name).map(|entry| entry.value().clone())
}
pub fn register_process(&self, process: Box<ProcessWrapper>) {
self.path_to_name
.insert(process.instance().wasm_path, process.name().to_string());
self.processes.insert(process.name().to_string(), process);
}
pub fn unregister_process(&self, process: Box<ProcessWrapper>) {
self.processes.remove(process.name());
self.path_to_name.remove(&process.instance().wasm_path);
}
pub fn unregister_process_via_path(&self, path: PathBuf) {
if let Some(process_name) = self.path_to_name.get(&path) {
self.processes.remove(process_name.as_str());
}
self.path_to_name.remove(&path);
}
}
async fn load_wasm_executor(path: &Path) -> anyhow::Result<(Box<ProcessWrapper>, String)> {
Err(anyhow::anyhow!(
"WASM‐loading not yet implemented for path: {:?}",
path
))
}
async fn unload_wasm_executor(_process_name: &str) -> anyhow::Result<()> {
Ok(())
}
#[async_trait]
impl WatchedType for ProcessWatcher {
fn is_relevant(&self, path: &Path) -> bool {
matches!(path.extension().and_then(|e| e.to_str()), Some("wasm"))
}
async fn on_create_or_modify(&self, path: &Path) -> anyhow::Result<()> {
let pathbuf = path.to_path_buf();
if let Some((_, old_name)) = self.path_to_name.remove(&pathbuf) {
self.unregister_process_via_path(path.to_path_buf());
self.processes.remove(&old_name);
unload_wasm_executor(&old_name).await?;
}
match load_wasm_executor(path).await {
Ok((process, process_name)) => {
self.register_process(process.clone());
self.processes.insert(process_name.clone(), process);
self.path_to_name.insert(pathbuf, process_name.clone());
Ok(())
}
Err(e) => Err(anyhow::anyhow!(
"Failed to load WASM executor {:?}: {}",
path,
e
)),
}
}
async fn on_remove(&self, path: &Path) -> anyhow::Result<()> {
let pathbuf = path.to_path_buf();
if let Some((_, process_name)) = self.path_to_name.remove(&pathbuf) {
let process = self.get_process(&process_name);
if let Some(pw) = process {
self.unregister_process(pw);
}
self.processes.remove(&process_name);
unload_wasm_executor(&process_name).await?;
}
Ok(())
}
}
impl fmt::Debug for ProcessWatcher {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut names: Vec<String> = Vec::new();
for entry in self.processes.iter() {
names.push(entry.key().clone());
}
let mut paths: Vec<PathBuf> = Vec::new();
for entry in self.path_to_name.iter() {
paths.push(entry.key().clone());
}
f.debug_struct("ProcessWatcher")
.field("registered_names", &names)
.field("loaded_paths", &paths)
.finish()
}
}