use super::types::*;
use crate::actor::Actor;
use crate::websocket_rpc::{DynASBClient, DynASBConfig, WebSocketRpcClient, WebSocketScriptActor};
use anyhow::Result;
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tracing::info;
#[async_trait]
pub trait ActorFactory: Send + Sync {
async fn create_instance(&self) -> Result<Box<dyn Actor>>;
fn get_metadata(&self) -> &DiscoveredScriptActor;
}
pub struct ScriptActorFactory {
pub metadata: DiscoveredScriptActor,
pub websocket_url: String,
pub redis_url: String,
}
impl ScriptActorFactory {
pub fn new(metadata: DiscoveredScriptActor) -> Result<Self> {
Ok(Self {
metadata,
websocket_url: std::env::var("WEBSOCKET_URL")
.unwrap_or_else(|_| "ws://localhost:8080".to_string()),
redis_url: std::env::var("REDIS_URL")
.unwrap_or_else(|_| "redis://localhost:6379".to_string()),
})
}
pub fn with_urls(
metadata: DiscoveredScriptActor,
websocket_url: String,
redis_url: String,
) -> Self {
Self {
metadata,
websocket_url,
redis_url,
}
}
}
#[async_trait]
impl ActorFactory for ScriptActorFactory {
async fn create_instance(&self) -> Result<Box<dyn Actor>> {
let rpc_client = Arc::new(WebSocketRpcClient::new(self.websocket_url.clone()));
let actor =
WebSocketScriptActor::new(self.metadata.clone(), rpc_client, self.redis_url.clone())
.await;
Ok(Box::new(actor))
}
fn get_metadata(&self) -> &DiscoveredScriptActor {
&self.metadata
}
}
pub struct PythonActorFactory {
base_factory: ScriptActorFactory,
#[allow(dead_code)]
python_path: Option<String>,
}
impl PythonActorFactory {
pub fn new(metadata: DiscoveredScriptActor) -> Result<Self> {
Ok(Self {
base_factory: ScriptActorFactory::new(metadata)?,
python_path: std::env::var("PYTHON_PATH").ok(),
})
}
}
#[async_trait]
impl ActorFactory for PythonActorFactory {
async fn create_instance(&self) -> Result<Box<dyn Actor>> {
self.base_factory.create_instance().await
}
fn get_metadata(&self) -> &DiscoveredScriptActor {
self.base_factory.get_metadata()
}
}
pub struct JavaScriptActorFactory {
base_factory: ScriptActorFactory,
#[allow(dead_code)]
node_path: Option<String>,
}
impl JavaScriptActorFactory {
pub fn new(metadata: DiscoveredScriptActor) -> Result<Self> {
Ok(Self {
base_factory: ScriptActorFactory::new(metadata)?,
node_path: std::env::var("NODE_PATH").ok(),
})
}
}
#[async_trait]
impl ActorFactory for JavaScriptActorFactory {
async fn create_instance(&self) -> Result<Box<dyn Actor>> {
self.base_factory.create_instance().await
}
fn get_metadata(&self) -> &DiscoveredScriptActor {
self.base_factory.get_metadata()
}
}
pub struct DynASBActorFactory {
metadata: DiscoveredScriptActor,
client: Arc<Mutex<DynASBClient>>,
}
impl DynASBActorFactory {
pub fn new(metadata: DiscoveredScriptActor, client: Arc<Mutex<DynASBClient>>) -> Self {
Self { metadata, client }
}
pub fn with_config(metadata: DiscoveredScriptActor, config: DynASBConfig) -> Self {
Self {
metadata,
client: Arc::new(Mutex::new(DynASBClient::new(config))),
}
}
}
#[async_trait]
impl ActorFactory for DynASBActorFactory {
async fn create_instance(&self) -> Result<Box<dyn Actor>> {
let metadata = &self.metadata;
let code = std::fs::read_to_string(&metadata.file_path).map_err(|e| {
anyhow::anyhow!(
"Failed to read script source at {:?}: {}",
metadata.file_path,
e
)
})?;
let runtime_str = match metadata.runtime {
ScriptRuntime::Python => "python",
ScriptRuntime::JavaScript => "nodejs",
};
let dependencies: Option<HashMap<String, String>> =
if metadata.workspace_metadata.dependencies.is_empty() {
None
} else {
Some(
metadata
.workspace_metadata
.dependencies
.iter()
.map(|dep| (dep.clone(), "*".to_string()))
.collect(),
)
};
let timeout = metadata.workspace_metadata.runtime_requirements.timeout;
let mut client = self.client.lock().await;
let func = client
.deploy(
&metadata.component,
runtime_str,
&code,
&metadata.component, dependencies,
Some(timeout),
)
.await?;
info!(
"Deployed script actor '{}' to dynASB as function '{}'",
metadata.component, func.function_id
);
client
.wait_until_ready(
&func.function_id,
Duration::from_secs(timeout as u64),
Duration::from_millis(500),
)
.await?;
let actor = client.create_actor(&func, metadata.clone()).await?;
Ok(Box::new(actor))
}
fn get_metadata(&self) -> &DiscoveredScriptActor {
&self.metadata
}
}