use rusqlite::params;
use crate::types::{IpcError, IpcResult, ModelEntry, NodeCapabilities, Subscription};
use convergio_db::pool::ConnPool;
pub fn store_models(
pool: &ConnPool,
host: &str,
provider: &str,
models: &[(String, f64, String)],
) -> IpcResult<()> {
let conn = pool.get()?;
for (name, size_gb, quant) in models {
conn.execute(
"INSERT OR REPLACE INTO ipc_model_registry
(host, provider, model, size_gb, quantization, last_seen)
VALUES (?1, ?2, ?3, ?4, ?5, strftime('%Y-%m-%dT%H:%M:%f','now'))",
params![host, provider, name, size_gb, quant],
)?;
}
Ok(())
}
pub fn get_all_models(pool: &ConnPool) -> IpcResult<Vec<ModelEntry>> {
let conn = pool.get()?;
let mut stmt = conn.prepare(
"SELECT host, provider, model, size_gb, quantization, last_seen
FROM ipc_model_registry ORDER BY host, provider, model",
)?;
let models = stmt
.query_map([], |row| {
Ok(ModelEntry {
host: row.get(0)?,
provider: row.get(1)?,
model: row.get(2)?,
size_gb: row.get(3)?,
quantization: row.get(4)?,
last_seen: row.get(5)?,
})
})?
.filter_map(|r| r.ok())
.collect();
Ok(models)
}
pub fn advertise_capabilities(pool: &ConnPool, host: &str) -> IpcResult<()> {
let conn = pool.get()?;
let mut stmt = conn.prepare(
"SELECT provider, GROUP_CONCAT(model)
FROM ipc_model_registry WHERE host = ?1 GROUP BY provider",
)?;
let providers: Vec<(String, String)> = stmt
.query_map(params![host], |row| Ok((row.get(0)?, row.get(1)?)))?
.filter_map(|r| r.ok())
.collect();
for (provider, models_csv) in &providers {
let models: Vec<&str> = models_csv.split(',').collect();
let json = serde_json::to_string(&models).unwrap_or_else(|_| "[]".to_string());
conn.execute(
"INSERT OR REPLACE INTO ipc_node_capabilities (host, provider, models, updated_at)
VALUES (?1, ?2, ?3, strftime('%Y-%m-%dT%H:%M:%f','now'))",
params![host, provider, json],
)?;
}
Ok(())
}
pub fn get_all_capabilities(pool: &ConnPool) -> IpcResult<Vec<NodeCapabilities>> {
let conn = pool.get()?;
let mut stmt = conn.prepare(
"SELECT host, provider, models, updated_at
FROM ipc_node_capabilities ORDER BY host",
)?;
let caps = stmt
.query_map([], |row| {
Ok(NodeCapabilities {
host: row.get(0)?,
provider: row.get(1)?,
models: serde_json::from_str::<Vec<String>>(&row.get::<_, String>(2)?)
.unwrap_or_default(),
updated_at: row.get(3)?,
})
})?
.filter_map(|r| r.ok())
.collect();
Ok(caps)
}
pub fn add_subscription(pool: &ConnPool, sub: &Subscription) -> IpcResult<()> {
let conn = pool.get()?;
let models_json = serde_json::to_string(&sub.models).unwrap_or_else(|_| "[]".to_string());
conn.execute(
"INSERT OR REPLACE INTO ipc_subscriptions
(name, provider, plan, budget_usd, reset_day, models)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![
sub.name,
sub.provider,
sub.plan,
sub.budget_usd,
sub.reset_day,
models_json
],
)?;
Ok(())
}
pub fn list_subscriptions(pool: &ConnPool) -> IpcResult<Vec<Subscription>> {
let conn = pool.get()?;
let mut stmt = conn.prepare(
"SELECT name, provider, plan, budget_usd, reset_day, models
FROM ipc_subscriptions ORDER BY name",
)?;
let subs = stmt
.query_map([], |row| {
Ok(Subscription {
name: row.get(0)?,
provider: row.get(1)?,
plan: row.get(2)?,
budget_usd: row.get(3)?,
reset_day: row.get(4)?,
models: serde_json::from_str::<Vec<String>>(&row.get::<_, String>(5)?)
.unwrap_or_default(),
})
})?
.filter_map(|r| r.ok())
.collect();
Ok(subs)
}
pub fn remove_subscription(pool: &ConnPool, name: &str) -> IpcResult<bool> {
let conn = pool.get()?;
let deleted = conn.execute(
"DELETE FROM ipc_subscriptions WHERE name = ?1",
params![name],
)?;
if deleted == 0 {
return Err(IpcError::NotFound(format!("subscription '{name}'")));
}
Ok(true)
}
pub async fn probe_ollama() -> Result<Vec<(String, f64, String)>, IpcError> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.build()
.map_err(|e| IpcError::Http(format!("client: {e}")))?;
let resp = client
.get("http://localhost:11434/api/tags")
.send()
.await
.map_err(|e| IpcError::Http(format!("ollama: {e}")))?;
let body: serde_json::Value = resp
.json()
.await
.map_err(|e| IpcError::Http(format!("parse: {e}")))?;
let models = body["models"]
.as_array()
.map(|arr| {
arr.iter()
.map(|m| {
let name = m["name"].as_str().unwrap_or("unknown").to_string();
let size = m["size"].as_u64().unwrap_or(0) as f64 / 1_073_741_824.0;
let quant = m["details"]["quantization_level"]
.as_str()
.unwrap_or("")
.to_string();
(name, size, quant)
})
.collect()
})
.unwrap_or_default();
Ok(models)
}
pub async fn probe_lmstudio() -> Result<Vec<(String, f64, String)>, IpcError> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.build()
.map_err(|e| IpcError::Http(format!("client: {e}")))?;
let resp = client
.get("http://localhost:1234/v1/models")
.send()
.await
.map_err(|e| IpcError::Http(format!("lmstudio: {e}")))?;
let body: serde_json::Value = resp
.json()
.await
.map_err(|e| IpcError::Http(format!("parse: {e}")))?;
let models = body["data"]
.as_array()
.map(|arr| {
arr.iter()
.map(|m| {
let name = m["id"].as_str().unwrap_or("unknown").to_string();
(name, 0.0, String::new())
})
.collect()
})
.unwrap_or_default();
Ok(models)
}
#[cfg(test)]
#[path = "models_tests.rs"]
mod tests;