#![allow(clippy::duplicated_attributes)]
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use crate::{Resolver, ServerConfig};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub(crate) struct LogEntry {
pub query: String,
pub app_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
pub detected_intents: Vec<String>,
pub confidence: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub flag: Option<String>,
pub timestamp_ms: u64,
pub router_version: u64,
}
#[derive(Debug, Clone, serde::Serialize)]
pub(crate) struct PendingCorrection {
pub namespace: String,
pub query: String,
pub wrong_intent: String,
pub right_intent: String,
}
#[derive(Debug, serde::Deserialize)]
struct BatchNsResult {
up_to_date: bool,
version: u64,
#[serde(default)]
export: Option<String>,
}
#[derive(Debug, serde::Deserialize)]
struct BatchSyncResponse {
#[serde(default)]
namespaces: HashMap<String, BatchNsResult>,
#[allow(dead_code)]
logs_accepted: Option<usize>,
#[allow(dead_code)]
corrections_applied: Option<usize>,
}
pub(crate) struct ConnectState {
pub server: ServerConfig,
pub log_buf: Arc<Mutex<Vec<LogEntry>>>,
pub correction_buf: Arc<Mutex<Vec<PendingCorrection>>>,
pub versions: Arc<RwLock<HashMap<String, u64>>>,
pub http: reqwest::blocking::Client,
}
impl ConnectState {
pub fn new(server: ServerConfig) -> Result<Self, crate::Error> {
let http = reqwest::blocking::Client::builder()
.timeout(Duration::from_secs(10))
.build()
.map_err(|e| crate::Error::Connect(format!("HTTP client: {}", e)))?;
Ok(Self {
server,
log_buf: Arc::new(Mutex::new(Vec::new())),
correction_buf: Arc::new(Mutex::new(Vec::new())),
versions: Arc::new(RwLock::new(HashMap::new())),
http,
})
}
pub fn list_remote_namespaces(&self) -> Result<Vec<String>, crate::Error> {
let url = format!("{}/api/namespaces", self.server.url);
let mut req = self.http.get(&url);
if let Some(ref key) = self.server.api_key {
req = req.header("X-Api-Key", key);
}
let resp = req
.send()
.map_err(|e| crate::Error::Connect(format!("list namespaces: {}", e)))?;
if !resp.status().is_success() {
return Err(crate::Error::Connect(format!(
"list namespaces: HTTP {}",
resp.status()
)));
}
let arr: Vec<serde_json::Value> = resp
.json()
.map_err(|e| crate::Error::Connect(format!("list namespaces parse: {}", e)))?;
Ok(arr
.iter()
.filter_map(|v| v.get("id").and_then(|x| x.as_str()).map(|s| s.to_string()))
.collect())
}
pub fn pull(&self, app_id: &str) -> Result<Option<(Resolver, u64)>, crate::Error> {
let url = format!("{}/api/sync", self.server.url);
let mut versions = HashMap::new();
versions.insert(app_id.to_string(), 0u64);
let body = serde_json::json!({
"local_versions": versions,
"logs": Vec::<LogEntry>::new(),
"corrections": Vec::<PendingCorrection>::new(),
"tick_interval_secs": self.server.tick_interval_secs,
"library_version": format!("microresolve-rust/{}", env!("CARGO_PKG_VERSION")),
});
let mut req = self.http.post(&url).json(&body);
if let Some(ref key) = self.server.api_key {
req = req.header("X-Api-Key", key);
}
let resp = req
.send()
.map_err(|e| crate::Error::Connect(format!("pull {}: {}", app_id, e)))?;
if !resp.status().is_success() {
return Err(crate::Error::Connect(format!(
"pull {}: HTTP {}",
app_id,
resp.status()
)));
}
let parsed: BatchSyncResponse = resp
.json()
.map_err(|e| crate::Error::Connect(e.to_string()))?;
match parsed.namespaces.get(app_id) {
Some(ns) if !ns.up_to_date => {
if let Some(json) = ns.export.as_ref() {
let r = Resolver::import_json(json)?;
self.versions
.write()
.unwrap()
.insert(app_id.to_string(), ns.version);
Ok(Some((r, ns.version)))
} else {
self.versions.write().unwrap().insert(app_id.to_string(), 0);
Ok(None)
}
}
_ => {
self.versions.write().unwrap().insert(app_id.to_string(), 0);
Ok(None)
}
}
}
pub fn push_correct(
&self,
app_id: &str,
query: &str,
wrong_intent: &str,
right_intent: &str,
) -> Result<(), crate::Error> {
let mut buf = self.correction_buf.lock().unwrap();
buf.push(PendingCorrection {
namespace: app_id.to_string(),
query: query.to_string(),
wrong_intent: wrong_intent.to_string(),
right_intent: right_intent.to_string(),
});
Ok(())
}
pub fn buffer_log(&self, entry: LogEntry) {
let mut buf = self.log_buf.lock().unwrap();
if buf.len() >= self.server.log_buffer_max && !buf.is_empty() {
buf.remove(0); }
buf.push(entry);
}
}
pub(crate) fn run_background<F>(state: Arc<ConnectState>, apply_pull: F)
where
F: Fn(&str, Resolver, u64) + Send + 'static,
{
let tick = Duration::from_secs(state.server.tick_interval_secs.max(1));
loop {
std::thread::sleep(tick);
match batch_sync(&state) {
Ok(resp) => {
for (app_id, ns_result) in resp.namespaces {
if !ns_result.up_to_date {
if let Some(json) = ns_result.export {
match Resolver::import_json(&json) {
Ok(resolver) => {
state
.versions
.write()
.unwrap()
.insert(app_id.clone(), ns_result.version);
apply_pull(&app_id, resolver, ns_result.version);
eprintln!(
"[microresolve-connect] reloaded {} → v{}",
app_id, ns_result.version
);
}
Err(e) => eprintln!(
"[microresolve-connect] import error {}: {}",
app_id, e
),
}
}
}
}
}
Err(e) => eprintln!("[microresolve-connect] batch sync error: {}", e),
}
}
}
fn batch_sync(state: &ConnectState) -> Result<BatchSyncResponse, crate::Error> {
let logs: Vec<LogEntry> = {
let mut buf = state.log_buf.lock().unwrap();
std::mem::take(&mut *buf)
};
let corrections: Vec<PendingCorrection> = {
let mut buf = state.correction_buf.lock().unwrap();
std::mem::take(&mut *buf)
};
let local_versions: HashMap<String, u64> = state.versions.read().unwrap().clone();
let url = format!("{}/api/sync", state.server.url);
let body = serde_json::json!({
"local_versions": local_versions,
"logs": logs,
"corrections": corrections,
"tick_interval_secs": state.server.tick_interval_secs,
"library_version": format!("microresolve-rust/{}", env!("CARGO_PKG_VERSION")),
});
let mut req = state.http.post(&url).json(&body);
if let Some(ref key) = state.server.api_key {
req = req.header("X-Api-Key", key);
}
let resp = req.send().map_err(|e| {
let mut log_buf = state.log_buf.lock().unwrap();
let mut c_buf = state.correction_buf.lock().unwrap();
let mut restored = logs.clone();
restored.extend(log_buf.drain(..));
restored.truncate(state.server.log_buffer_max);
*log_buf = restored;
let mut rc = corrections.clone();
rc.extend(c_buf.drain(..));
*c_buf = rc;
crate::Error::Connect(format!("batch sync send: {}", e))
})?;
if !resp.status().is_success() {
let status = resp.status();
let mut log_buf = state.log_buf.lock().unwrap();
let mut c_buf = state.correction_buf.lock().unwrap();
let mut restored = logs;
restored.extend(log_buf.drain(..));
restored.truncate(state.server.log_buffer_max);
*log_buf = restored;
let mut rc = corrections;
rc.extend(c_buf.drain(..));
*c_buf = rc;
return Err(crate::Error::Connect(format!("batch sync HTTP {}", status)));
}
resp.json()
.map_err(|e| crate::Error::Connect(format!("batch sync parse: {}", e)))
}
pub(crate) fn now_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}