#![allow(clippy::duplicated_attributes)]
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use crate::{Resolver, ServerConfig};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub(crate) struct LogEntry {
pub query: String,
pub app_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
pub detected_intents: Vec<String>,
pub confidence: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub flag: Option<String>,
pub timestamp_ms: u64,
pub router_version: u64,
}
#[derive(Debug, Clone, serde::Serialize)]
pub(crate) struct PendingCorrection {
pub namespace: String,
pub query: String,
pub wrong_intent: String,
pub right_intent: String,
}
#[derive(Debug, serde::Deserialize)]
struct BatchNsResult {
#[serde(default)]
up_to_date: bool,
version: u64,
#[serde(default)]
ops: Option<Vec<crate::oplog::Op>>,
#[serde(default)]
cold_start_required: bool,
}
#[derive(Debug, serde::Deserialize)]
struct BatchSyncResponse {
#[serde(default)]
namespaces: HashMap<String, BatchNsResult>,
#[allow(dead_code)]
logs_accepted: Option<usize>,
#[allow(dead_code)]
corrections_applied: Option<usize>,
}
pub(crate) struct ConnectState {
pub server: ServerConfig,
pub log_buf: Arc<Mutex<Vec<LogEntry>>>,
pub correction_buf: Arc<Mutex<Vec<PendingCorrection>>>,
pub versions: Arc<RwLock<HashMap<String, u64>>>,
pub http: reqwest::blocking::Client,
}
impl ConnectState {
pub fn new(server: ServerConfig) -> Result<Self, crate::Error> {
let http = reqwest::blocking::Client::builder()
.timeout(Duration::from_secs(10))
.build()
.map_err(|e| crate::Error::Connect(format!("HTTP client: {}", e)))?;
Ok(Self {
server,
log_buf: Arc::new(Mutex::new(Vec::new())),
correction_buf: Arc::new(Mutex::new(Vec::new())),
versions: Arc::new(RwLock::new(HashMap::new())),
http,
})
}
pub fn list_remote_namespaces(&self) -> Result<Vec<String>, crate::Error> {
let url = format!("{}/api/namespaces", self.server.url);
let mut req = self.http.get(&url);
if let Some(ref key) = self.server.api_key {
req = req.header("X-Api-Key", key);
}
let resp = req
.send()
.map_err(|e| crate::Error::Connect(format!("list namespaces: {}", e)))?;
if !resp.status().is_success() {
return Err(crate::Error::Connect(format!(
"list namespaces: HTTP {}",
resp.status()
)));
}
let arr: Vec<serde_json::Value> = resp
.json()
.map_err(|e| crate::Error::Connect(format!("list namespaces parse: {}", e)))?;
Ok(arr
.iter()
.filter_map(|v| v.get("id").and_then(|x| x.as_str()).map(|s| s.to_string()))
.collect())
}
pub fn fetch_snapshot(
&self,
ns_ids: &[String],
) -> Result<HashMap<String, (Resolver, u64)>, crate::Error> {
let url = format!("{}/api/snapshot", self.server.url);
let body = serde_json::json!({ "namespace_ids": ns_ids });
let mut req = self.http.post(&url).json(&body);
if let Some(ref key) = self.server.api_key {
req = req.header("X-Api-Key", key);
}
let resp = req
.send()
.map_err(|e| crate::Error::Connect(format!("snapshot: {}", e)))?;
if !resp.status().is_success() {
return Err(crate::Error::Connect(format!(
"snapshot: HTTP {}",
resp.status()
)));
}
#[derive(serde::Deserialize)]
struct SnapshotNs {
version: u64,
export: String,
}
#[derive(serde::Deserialize)]
struct SnapshotResponse {
namespaces: HashMap<String, SnapshotNs>,
}
let parsed: SnapshotResponse = resp
.json()
.map_err(|e| crate::Error::Connect(format!("snapshot parse: {}", e)))?;
let mut result = HashMap::new();
for (id, ns) in parsed.namespaces {
match Resolver::import_json(&ns.export) {
Ok(r) => {
self.versions
.write()
.unwrap()
.insert(id.clone(), ns.version);
result.insert(id, (r, ns.version));
}
Err(e) => eprintln!("[microresolve-connect] snapshot import error {}: {}", id, e),
}
}
Ok(result)
}
pub fn buffer_log(&self, entry: LogEntry) {
let mut buf = self.log_buf.lock().unwrap();
if buf.len() >= self.server.log_buffer_max && !buf.is_empty() {
buf.remove(0); }
buf.push(entry);
}
}
pub fn apply_ops(resolver: &mut Resolver, ops: &[crate::oplog::Op]) -> Result<(), crate::Error> {
use crate::oplog::Op;
for op in ops {
match op {
Op::IntentAdded {
id,
phrases_by_lang,
..
} => {
let seeds = crate::IntentSeeds::Multi(
phrases_by_lang
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
);
let _ = resolver.add_intent(id, seeds);
}
Op::IntentRemoved { id } => resolver.remove_intent(id),
Op::PhraseAdded {
intent_id,
phrase,
lang,
} => {
resolver.add_phrase(intent_id, phrase, lang);
}
Op::PhraseRemoved { intent_id, phrase } => {
resolver.remove_phrase(intent_id, phrase);
}
Op::WeightUpdates { changes } => {
for (token, intent_id, post_weight) in changes {
resolver
.index_mut()
.set_weight(token, intent_id, *post_weight);
}
}
Op::IntentMetadataUpdated { id, edit_json } => {
let edit: crate::IntentEdit = serde_json::from_str(edit_json)
.map_err(|e| crate::Error::Parse(format!("intent edit parse: {}", e)))?;
let _ = resolver.update_intent(id, edit);
}
Op::NamespaceMetadataUpdated { edit_json } => {
let edit: crate::NamespaceEdit = serde_json::from_str(edit_json)
.map_err(|e| crate::Error::Parse(format!("namespace edit parse: {}", e)))?;
let _ = resolver.update_namespace(edit);
}
Op::DomainDescription {
domain,
description,
} => match description {
Some(d) => resolver.set_domain_description(domain, d),
None => resolver.remove_domain_description(domain),
},
}
}
Ok(())
}
pub(crate) fn run_background<F, FD>(state: Arc<ConnectState>, apply_pull: F, apply_delta: FD)
where
F: Fn(&str, Resolver, u64) + Send + 'static,
FD: Fn(&str, &[crate::oplog::Op], u64) -> Result<(), crate::Error> + Send + 'static,
{
let tick = Duration::from_secs(state.server.tick_interval_secs.max(1));
loop {
std::thread::sleep(tick);
match batch_sync(&state) {
Ok(resp) => {
let mut needs_snapshot: Vec<String> = Vec::new();
for (app_id, ns_result) in resp.namespaces {
if ns_result.up_to_date {
continue;
}
if ns_result.cold_start_required {
needs_snapshot.push(app_id);
} else if let Some(ops) = ns_result.ops {
match apply_delta(&app_id, &ops, ns_result.version) {
Ok(()) => {
state
.versions
.write()
.unwrap()
.insert(app_id.clone(), ns_result.version);
eprintln!(
"[microresolve-connect] delta {} → v{} ({} ops applied)",
app_id,
ns_result.version,
ops.len()
);
}
Err(e) => {
eprintln!(
"[microresolve-connect] delta apply error {} (will retry snapshot): {}",
app_id, e
);
}
}
}
}
if !needs_snapshot.is_empty() {
match state.fetch_snapshot(&needs_snapshot) {
Ok(snaps) => {
for (id, (resolver, version)) in snaps {
apply_pull(&id, resolver, version);
eprintln!(
"[microresolve-connect] snapshot reloaded {} → v{}",
id, version
);
}
}
Err(e) => eprintln!("[microresolve-connect] snapshot fetch error: {}", e),
}
}
}
Err(e) => eprintln!("[microresolve-connect] batch sync error: {}", e),
}
}
}
fn batch_sync(state: &ConnectState) -> Result<BatchSyncResponse, crate::Error> {
let logs: Vec<LogEntry> = {
let mut buf = state.log_buf.lock().unwrap();
std::mem::take(&mut *buf)
};
let corrections: Vec<PendingCorrection> = {
let mut buf = state.correction_buf.lock().unwrap();
std::mem::take(&mut *buf)
};
let local_versions: HashMap<String, u64> = state.versions.read().unwrap().clone();
let url = format!("{}/api/sync", state.server.url);
let body = serde_json::json!({
"local_versions": local_versions,
"logs": logs,
"corrections": corrections,
"tick_interval_secs": state.server.tick_interval_secs,
"library_version": format!("microresolve-rust/{}", env!("CARGO_PKG_VERSION")),
"supports_delta": true,
});
let mut req = state.http.post(&url).json(&body);
if let Some(ref key) = state.server.api_key {
req = req.header("X-Api-Key", key);
}
let resp = req.send().map_err(|e| {
let mut log_buf = state.log_buf.lock().unwrap();
let mut c_buf = state.correction_buf.lock().unwrap();
let mut restored = logs.clone();
restored.extend(log_buf.drain(..));
restored.truncate(state.server.log_buffer_max);
*log_buf = restored;
let mut rc = corrections.clone();
rc.extend(c_buf.drain(..));
*c_buf = rc;
crate::Error::Connect(format!("batch sync send: {}", e))
})?;
if !resp.status().is_success() {
let status = resp.status();
let mut log_buf = state.log_buf.lock().unwrap();
let mut c_buf = state.correction_buf.lock().unwrap();
let mut restored = logs;
restored.extend(log_buf.drain(..));
restored.truncate(state.server.log_buffer_max);
*log_buf = restored;
let mut rc = corrections;
rc.extend(c_buf.drain(..));
*c_buf = rc;
return Err(crate::Error::Connect(format!("batch sync HTTP {}", status)));
}
resp.json()
.map_err(|e| crate::Error::Connect(format!("batch sync parse: {}", e)))
}
pub(crate) fn now_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}