use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex, OnceLock};
use serde_json::json;
use crate::catalog::Catalog;
use crate::configuration::{Configuration, FoundryLocalConfig, Logger};
use crate::detail::core_interop::CoreInterop;
use crate::detail::ModelLoadManager;
use crate::error::{FoundryLocalError, Result};
use crate::types::{EpDownloadResult, EpInfo};
static INSTANCE: OnceLock<FoundryLocalManager> = OnceLock::new();
static INIT_GUARD: Mutex<()> = Mutex::new(());
pub struct FoundryLocalManager {
core: Arc<CoreInterop>,
catalog: Catalog,
urls: Mutex<Vec<String>>,
_logger: Option<Box<dyn Logger>>,
}
type EpDownloadProgressCallback = Box<dyn FnMut(&str, f64) + Send + 'static>;
pub struct EpDownloadBuilder<'a> {
manager: &'a FoundryLocalManager,
names: Option<Vec<String>>,
progress_callback: Option<EpDownloadProgressCallback>,
cancel_flag: Option<Arc<AtomicBool>>,
}
impl<'a> EpDownloadBuilder<'a> {
fn new(manager: &'a FoundryLocalManager) -> Self {
Self {
manager,
names: None,
progress_callback: None,
cancel_flag: None,
}
}
pub fn names<I, S>(mut self, names: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.names = Some(names.into_iter().map(Into::into).collect());
self
}
pub fn progress<F>(mut self, callback: F) -> Self
where
F: FnMut(&str, f64) + Send + 'static,
{
self.progress_callback = Some(Box::new(callback));
self
}
pub fn cancel(mut self, cancel_flag: Arc<AtomicBool>) -> Self {
self.cancel_flag = Some(cancel_flag);
self
}
pub async fn run(self) -> Result<EpDownloadResult> {
let names: Option<Vec<&str>> = self
.names
.as_ref()
.map(|names| names.iter().map(String::as_str).collect());
self.manager
.download_and_register_eps_impl(
names.as_deref(),
self.progress_callback,
self.cancel_flag,
)
.await
}
}
impl FoundryLocalManager {
pub fn create(config: FoundryLocalConfig) -> Result<&'static Self> {
if let Some(manager) = INSTANCE.get() {
return Ok(manager);
}
let _guard = INIT_GUARD.lock().map_err(|_| FoundryLocalError::Internal {
reason: "initialisation guard poisoned".into(),
})?;
if let Some(manager) = INSTANCE.get() {
return Ok(manager);
}
let (mut internal_config, logger) = Configuration::new(config)?;
let core = Arc::new(CoreInterop::new(&mut internal_config)?);
let init_params = json!({ "Params": internal_config.params });
core.execute_command("initialize", Some(&init_params))?;
let service_endpoint = internal_config.params.get("WebServiceExternalUrl").cloned();
let model_load_manager =
Arc::new(ModelLoadManager::new(Arc::clone(&core), service_endpoint));
let catalog = Catalog::new(Arc::clone(&core), Arc::clone(&model_load_manager))?;
let manager = FoundryLocalManager {
core,
catalog,
urls: Mutex::new(Vec::new()),
_logger: logger,
};
match INSTANCE.set(manager) {
Ok(()) => Ok(INSTANCE.get().unwrap()),
Err(_) => {
Ok(INSTANCE.get().unwrap())
}
}
}
pub fn catalog(&self) -> &Catalog {
&self.catalog
}
pub fn urls(&self) -> Result<Vec<String>> {
let lock = self.urls.lock().map_err(|_| FoundryLocalError::Internal {
reason: "Failed to acquire urls lock".into(),
})?;
Ok(lock.clone())
}
pub async fn start_web_service(&self) -> Result<()> {
let raw = self
.core
.execute_command_async("start_service".into(), None)
.await?;
let parsed: Vec<String> = if raw.trim().is_empty() {
Vec::new()
} else {
serde_json::from_str(&raw)?
};
*self.urls.lock().map_err(|_| FoundryLocalError::Internal {
reason: "Failed to acquire urls lock".into(),
})? = parsed;
Ok(())
}
pub async fn stop_web_service(&self) -> Result<()> {
self.core
.execute_command_async("stop_service".into(), None)
.await?;
self.urls
.lock()
.map_err(|_| FoundryLocalError::Internal {
reason: "Failed to acquire urls lock".into(),
})?
.clear();
Ok(())
}
pub fn discover_eps(&self) -> Result<Vec<EpInfo>> {
let raw = self.core.execute_command("discover_eps", None)?;
let eps: Vec<EpInfo> = serde_json::from_str(&raw)?;
Ok(eps)
}
pub async fn download_and_register_eps(
&self,
names: Option<&[&str]>,
) -> Result<EpDownloadResult> {
self.download_and_register_eps_impl(names, None::<fn(&str, f64)>, None)
.await
}
pub async fn download_and_register_eps_with_progress<F>(
&self,
names: Option<&[&str]>,
progress_callback: F,
) -> Result<EpDownloadResult>
where
F: FnMut(&str, f64) + Send + 'static,
{
self.download_and_register_eps_impl(names, Some(progress_callback), None)
.await
}
pub fn download_and_register_eps_builder(&self) -> EpDownloadBuilder<'_> {
EpDownloadBuilder::new(self)
}
async fn download_and_register_eps_impl<F>(
&self,
names: Option<&[&str]>,
progress_callback: Option<F>,
cancel_flag: Option<Arc<AtomicBool>>,
) -> Result<EpDownloadResult>
where
F: FnMut(&str, f64) + Send + 'static,
{
let params = match names {
Some(n) if !n.is_empty() => Some(json!({ "Params": { "Names": n.join(",") } })),
_ => None,
};
let raw = match (progress_callback, cancel_flag) {
(Some(cb), Some(flag)) => {
let mut callback = cb;
let wrapper = move |chunk: &str| {
if let Some(sep) = chunk.find('|') {
let name = &chunk[..sep];
if let Ok(percent) = chunk[sep + 1..].parse::<f64>() {
callback(if name.is_empty() { "" } else { name }, percent);
}
}
};
self.core
.execute_command_streaming_cancellable_async(
"download_and_register_eps".into(),
params,
wrapper,
flag,
)
.await?
}
(Some(cb), None) => {
let mut callback = cb;
let wrapper = move |chunk: &str| {
if let Some(sep) = chunk.find('|') {
let name = &chunk[..sep];
if let Ok(percent) = chunk[sep + 1..].parse::<f64>() {
callback(if name.is_empty() { "" } else { name }, percent);
}
}
};
self.core
.execute_command_streaming_async(
"download_and_register_eps".into(),
params,
wrapper,
)
.await?
}
(None, Some(flag)) => {
self.core
.execute_command_streaming_cancellable_async(
"download_and_register_eps".into(),
params,
|_chunk: &str| {},
flag,
)
.await?
}
(None, None) => {
self.core
.execute_command_async("download_and_register_eps".into(), params)
.await?
}
};
let result: EpDownloadResult = serde_json::from_str(&raw)?;
if result.success || !result.registered_eps.is_empty() {
self.catalog.invalidate_cache();
}
Ok(result)
}
}