#![allow(clippy::duplicated_attributes)]
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use crate::{Resolver, ServerConfig};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub(crate) struct LogEntry {
pub query: String,
pub app_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
pub detected_intents: Vec<String>,
pub confidence: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub flag: Option<String>,
pub timestamp_ms: u64,
pub router_version: u64,
}
#[derive(Debug, serde::Deserialize)]
struct SyncResponse {
up_to_date: bool,
version: u64,
#[serde(default)]
export: Option<String>,
}
pub(crate) struct ConnectState {
pub server: ServerConfig,
pub log_buf: Arc<Mutex<Vec<LogEntry>>>,
pub versions: Arc<RwLock<HashMap<String, u64>>>,
pub http: reqwest::blocking::Client,
}
impl ConnectState {
pub fn new(server: ServerConfig) -> Result<Self, crate::Error> {
let http = reqwest::blocking::Client::builder()
.timeout(Duration::from_secs(10))
.build()
.map_err(|e| crate::Error::Connect(format!("HTTP client: {}", e)))?;
Ok(Self {
server,
log_buf: Arc::new(Mutex::new(Vec::new())),
versions: Arc::new(RwLock::new(HashMap::new())),
http,
})
}
pub fn pull(&self, app_id: &str) -> Result<Option<(Resolver, u64)>, crate::Error> {
let url = format!("{}/api/sync?version=0", self.server.url);
let mut req = self.http.get(&url).header("X-Namespace-ID", app_id);
if let Some(ref key) = self.server.api_key {
req = req.header("X-Api-Key", key);
}
let resp = req
.send()
.map_err(|e| crate::Error::Connect(format!("pull {}: {}", app_id, e)))?;
if !resp.status().is_success() {
return Err(crate::Error::Connect(format!(
"pull {}: HTTP {}",
app_id,
resp.status()
)));
}
let sync: SyncResponse = resp
.json()
.map_err(|e| crate::Error::Connect(e.to_string()))?;
match sync.export {
Some(json) => {
let r = Resolver::import_json(&json)?;
self.versions
.write()
.unwrap()
.insert(app_id.to_string(), sync.version);
Ok(Some((r, sync.version)))
}
None => {
self.versions.write().unwrap().insert(app_id.to_string(), 0);
Ok(None)
}
}
}
pub fn push_correct(
&self,
app_id: &str,
query: &str,
wrong_intent: &str,
right_intent: &str,
) -> Result<(), crate::Error> {
let url = format!("{}/api/correct", self.server.url);
let body = serde_json::json!({
"query": query,
"wrong_intent": wrong_intent,
"right_intent": right_intent,
});
let mut req = self
.http
.post(&url)
.header("X-Namespace-ID", app_id)
.json(&body);
if let Some(ref key) = self.server.api_key {
req = req.header("X-Api-Key", key);
}
let resp = req
.send()
.map_err(|e| crate::Error::Connect(format!("server push: {}", e)))?;
if !resp.status().is_success() {
return Err(crate::Error::Connect(format!(
"server returned {}",
resp.status()
)));
}
Ok(())
}
pub fn buffer_log(&self, entry: LogEntry) {
let mut buf = self.log_buf.lock().unwrap();
if buf.len() >= self.server.log_buffer_max && !buf.is_empty() {
buf.remove(0); }
buf.push(entry);
}
pub fn version_of(&self, app_id: &str) -> u64 {
self.versions
.read()
.unwrap()
.get(app_id)
.copied()
.unwrap_or(0)
}
}
pub(crate) fn run_background<F>(state: Arc<ConnectState>, apply_pull: F)
where
F: Fn(&str, Resolver, u64) + Send + 'static,
{
let tick = Duration::from_secs(state.server.tick_interval_secs.max(1));
loop {
std::thread::sleep(tick);
flush_logs(&state);
let app_ids: Vec<String> = state.versions.read().unwrap().keys().cloned().collect();
for app_id in app_ids {
let local_v = state.version_of(&app_id);
match check_and_apply(&state, &app_id, local_v) {
Ok(Some((resolver, version))) => {
apply_pull(&app_id, resolver, version);
eprintln!("[microresolve-connect] reloaded {} → v{}", app_id, version);
}
Ok(None) => {}
Err(e) => eprintln!("[microresolve-connect] sync error {}: {}", app_id, e),
}
}
}
}
fn check_and_apply(
state: &ConnectState,
app_id: &str,
local_version: u64,
) -> Result<Option<(Resolver, u64)>, crate::Error> {
let url = format!("{}/api/sync?version={}", state.server.url, local_version);
let mut req = state.http.get(&url).header("X-Namespace-ID", app_id);
if let Some(ref key) = state.server.api_key {
req = req.header("X-Api-Key", key);
}
let resp = req
.send()
.map_err(|e| crate::Error::Connect(e.to_string()))?;
if !resp.status().is_success() {
return Err(crate::Error::Connect(format!("HTTP {}", resp.status())));
}
let sync: SyncResponse = resp
.json()
.map_err(|e| crate::Error::Connect(e.to_string()))?;
if sync.up_to_date {
return Ok(None);
}
let json = sync
.export
.ok_or_else(|| crate::Error::Connect("no export in response".into()))?;
let resolver = Resolver::import_json(&json)?;
Ok(Some((resolver, sync.version)))
}
fn flush_logs(state: &ConnectState) {
let entries: Vec<LogEntry> = {
let mut buf = state.log_buf.lock().unwrap();
std::mem::take(&mut *buf)
};
if entries.is_empty() {
return;
}
let mut by_app: HashMap<String, Vec<&LogEntry>> = HashMap::new();
for e in &entries {
by_app.entry(e.app_id.clone()).or_default().push(e);
}
let mut failed: Vec<LogEntry> = Vec::new();
for (app_id, batch) in by_app {
let url = format!("{}/api/ingest", state.server.url);
let mut req = state
.http
.post(&url)
.header("X-Namespace-ID", &app_id)
.json(&batch);
if let Some(ref key) = state.server.api_key {
req = req.header("X-Api-Key", key);
}
if let Err(e) = req.send() {
eprintln!("[microresolve-connect] log flush {}: {}", app_id, e);
failed.extend(batch.into_iter().cloned());
}
}
if !failed.is_empty() {
let mut buf = state.log_buf.lock().unwrap();
failed.extend(buf.drain(..));
failed.truncate(state.server.log_buffer_max);
*buf = failed;
}
}
pub(crate) fn now_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}