use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use crate::engine::EngineConfig;
use crate::engine::analysis_store::EngineAnalysisStore;
use crate::engine::uci_ucci_engine::{EngineConfigureRequest, UciUcciEngine};
use crate::runtime_log;
fn lock_mutex<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
mutex
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
const STREAM_JOIN_TIMEOUT: Duration = Duration::from_secs(2);
pub struct EngineStreamRuntime {
engine: Arc<Mutex<UciUcciEngine>>,
store: Arc<Mutex<EngineAnalysisStore>>,
stop: Arc<AtomicBool>,
join: Mutex<Option<JoinHandle<()>>>,
autoplay_join: Mutex<Option<JoinHandle<crate::engine::EngineAnalyzeResult>>>,
autoplay_cancel: Arc<AtomicBool>,
session_gen: Arc<AtomicU64>,
active_fen: Mutex<String>,
}
impl Default for EngineStreamRuntime {
fn default() -> Self {
Self::new(Arc::new(Mutex::new(UciUcciEngine::new(None))))
}
}
impl EngineStreamRuntime {
pub fn new(engine: Arc<Mutex<UciUcciEngine>>) -> Self {
Self {
engine,
store: Arc::new(Mutex::new(EngineAnalysisStore::empty_for_fen(""))),
stop: Arc::new(AtomicBool::new(false)),
join: Mutex::new(None),
autoplay_join: Mutex::new(None),
autoplay_cancel: Arc::new(AtomicBool::new(false)),
session_gen: Arc::new(AtomicU64::new(0)),
active_fen: Mutex::new(String::new()),
}
}
fn infinite_thread_active(&self) -> bool {
lock_mutex(&self.join)
.as_ref()
.is_some_and(|h| !h.is_finished())
}
pub fn needs_process_release(&self) -> bool {
if self.infinite_thread_active() || self.is_autoplay_running() {
return false;
}
self.engine
.lock()
.map(|eng| eng.has_child_process())
.unwrap_or(false)
}
pub fn release_engine_process(&self) {
if !self.needs_process_release() {
return;
}
self.stop_infinite_stream_blocking();
self.stop_autoplay_blocking();
if let Ok(eng) = self.engine.lock() {
eng.terminate_locked();
}
}
pub fn store_revision(&self) -> u64 {
lock_mutex(&self.store).revision
}
pub fn clone_store(&self) -> EngineAnalysisStore {
lock_mutex(&self.store).clone()
}
pub fn configure_engine(&self, cfg: &EngineConfig) {
let path = cfg.path.trim();
if path.is_empty() {
return;
}
if let Ok(mut eng) = self.engine.lock() {
eng.configure(EngineConfigureRequest {
engine_path: Some(path.to_string()),
threads: Some(i32::from(cfg.threads)),
hash_mb: Some(cfg.hash_mb as i32),
repetition_rule: Some(cfg.variant.clone()),
draw_rule: Some(cfg.rule.clone()),
skill_level: Some(i32::from(cfg.skill_level)),
engine_protocol_preference: Some(cfg.protocol.preference().to_string()),
engine_config_path: None,
protocol_detected_for_path: None,
protocol_detected: None,
});
}
}
pub fn is_running(&self) -> bool {
self.infinite_thread_active()
}
pub fn active_fen(&self) -> String {
lock_mutex(&self.active_fen).clone()
}
pub fn ensure_stream(&self, fen: &str, cfg: &EngineConfig, want_stream: bool) {
if !want_stream || cfg.path.trim().is_empty() {
if self.infinite_thread_active() {
self.stop_infinite_stream();
}
return;
}
let fen = fen.trim().to_string();
if self.infinite_thread_active() {
if self.active_fen() == fen {
return;
}
self.stop_infinite_stream_blocking();
}
self.configure_engine(cfg);
let session = self.session_gen.fetch_add(1, Ordering::SeqCst) + 1;
*lock_mutex(&self.active_fen) = fen.clone();
{
let mut guard = lock_mutex(&self.store);
guard.reset_for_stream(&fen);
}
self.stop.store(false, Ordering::SeqCst);
let eng = self.engine.clone();
let store = self.store.clone();
let stop = self.stop.clone();
let live_session = self.session_gen.clone();
let multipv = i32::from(cfg.multi_pv.max(1));
let handle = thread::spawn(move || {
if let Ok(mut engine) = eng.lock() {
engine.run_infinite_analysis(&fen, &store, &stop, &live_session, session, multipv);
}
});
*lock_mutex(&self.join) = Some(handle);
}
pub fn stop_infinite_stream(&self) {
self.stop_infinite_stream_inner(STREAM_JOIN_TIMEOUT);
}
fn stop_infinite_stream_blocking(&self) {
self.stop_infinite_stream_inner(STREAM_JOIN_TIMEOUT + Duration::from_secs(3));
}
fn stop_infinite_stream_inner(&self, join_timeout: Duration) {
self.stop.store(true, Ordering::SeqCst);
self.session_gen.fetch_add(1, Ordering::SeqCst);
let deadline = Instant::now() + join_timeout;
loop {
let mut slot = lock_mutex(&self.join);
let Some(handle) = slot.take() else {
break;
};
if handle.is_finished() {
drop(slot);
let _ = handle.join();
continue;
}
if Instant::now() >= deadline {
*slot = Some(handle);
runtime_log::warn(
"[engine_stream] infinite join timed out; stream stays inactive until thread exits",
);
break;
}
drop(slot);
thread::sleep(Duration::from_millis(10));
}
self.stop.store(false, Ordering::SeqCst);
*lock_mutex(&self.active_fen) = String::new();
}
pub fn stop_all(&self) {
self.stop_infinite_stream_blocking();
self.stop_autoplay_blocking();
}
pub fn is_autoplay_running(&self) -> bool {
let guard = lock_mutex(&self.autoplay_join);
guard.as_ref().is_some_and(|handle| !handle.is_finished())
}
pub fn spawn_autoplay_once(&self, fen: &str, cfg: &EngineConfig) {
self.stop_infinite_stream();
self.stop_autoplay();
self.autoplay_cancel.store(false, Ordering::SeqCst);
let fen = fen.trim().to_string();
{
let mut guard = lock_mutex(&self.store);
guard.reset_for_stream(&fen);
}
self.configure_engine(cfg);
let eng = self.engine.clone();
let store = self.store.clone();
let cfg = cfg.clone();
let cancel = self.autoplay_cancel.clone();
let handle = thread::spawn(move || {
if let Ok(mut engine) = eng.lock() {
let (depth, movetime_ms, search_nodes) = cfg.analyze_go_args();
engine.analyze_autoplay_once_with_cancel(
fen.as_str(),
depth,
movetime_ms,
search_nodes,
Some(&store),
Some(&cancel),
)
} else {
crate::engine::EngineAnalyzeResult::default()
}
});
*lock_mutex(&self.autoplay_join) = Some(handle);
}
pub fn poll_autoplay_done(&self) -> Option<crate::engine::EngineAnalyzeResult> {
let mut slot = lock_mutex(&self.autoplay_join);
let handle = slot.as_ref()?;
if !handle.is_finished() {
return None;
}
let handle = slot.take()?;
handle.join().ok()
}
pub fn stop_autoplay(&self) {
self.autoplay_cancel.store(true, Ordering::SeqCst);
let handle = lock_mutex(&self.autoplay_join).take();
if let Some(handle) = handle {
if handle.is_finished() {
let _ = handle.join();
} else {
thread::spawn(move || {
let _ = handle.join();
});
}
}
self.autoplay_cancel.store(false, Ordering::SeqCst);
}
pub fn stop_autoplay_blocking(&self) {
self.autoplay_cancel.store(true, Ordering::SeqCst);
if let Some(handle) = lock_mutex(&self.autoplay_join).take() {
let _ = handle.join();
}
self.autoplay_cancel.store(false, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::EngineProtocol;
fn sample_config(path: &str) -> EngineConfig {
EngineConfig {
path: path.to_string(),
protocol: EngineProtocol::Uci,
threads: 2,
hash_mb: 128,
..EngineConfig::default()
}
}
#[test]
fn ensure_stream_noop_when_disabled() {
let rt = EngineStreamRuntime::default();
let cfg = sample_config("C:\\engines\\fake.exe");
rt.ensure_stream("fen", &cfg, false);
assert!(!rt.is_running());
}
#[test]
fn ensure_stream_noop_when_path_empty() {
let rt = EngineStreamRuntime::default();
let cfg = sample_config("");
rt.ensure_stream("fen", &cfg, true);
assert!(!rt.is_running());
}
#[test]
fn stop_infinite_stream_is_idempotent() {
let rt = EngineStreamRuntime::default();
rt.stop_infinite_stream();
rt.stop_infinite_stream();
assert!(!rt.is_running());
}
}