Skip to main content

katago_analysis/engine/
mod.rs

1//! KataGo's analysis protocol, implemented as a low-level stream of unsynchronized messages.
2//!
3//! See [KataGo Parallel Analysis Engine](https://github.com/lightvector/KataGo/blob/master/docs/Analysis_Engine.md)
4//! for official documentation of the analysis engine.
5//!
6//! You probably want to use the higher-level API in the [crate root](crate) instead of this module. This module is intended
7//! for use cases that require lower-level control over the messages sent to and received from the engine.
8//!
9//! Note: The asynchronous methods in this library must be called from within a Tokio runtime.
10//!
11//! # Example
12//!
13//! ```
14//! use katago_analysis::{
15//!     Player, Result, Rules,
16//!     engine::{AnalysisRequest, AnalysisResponse, Engine, LaunchOptions, Request, Response},
17//! };
18//! use tokio_stream::StreamExt;
19//!
20//! async fn example(
21//!     katago_path: String,
22//!     analysis_config_path: String,
23//!     model_path: String,
24//! ) -> Result<()> {
25//!     let options = LaunchOptions::new(katago_path, analysis_config_path, model_path);
26//!     let mut engine = Engine::launch(&options)?;
27//!
28//!     let request = AnalysisRequest::new(
29//!         "1".to_string(),
30//!         Rules::chinese(),
31//!         19,
32//!         19,
33//!         vec![
34//!             (Player::Black, "Q16".to_string()),
35//!             (Player::White, "D4".to_string()),
36//!         ],
37//!     );
38//!     engine.stdin.send(&Request::Analyze(request)).await?;
39//!     match engine.stdout.try_next().await? {
40//!         Some(Response::Analyze(AnalysisResponse { move_infos, .. })) => {
41//!             println!(
42//!                 "Best move: {} ({:.1}%)",
43//!                 move_infos[0].mv,
44//!                 move_infos[0].winrate * 100.0
45//!             );
46//!             println!("{:?}", move_infos[0]);
47//!         }
48//!         _ => println!("Something went wrong"),
49//!     };
50//!     Ok(())
51//! }
52//! ```
53
54use std::{io, process::Stdio};
55
56use tokio::{
57    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
58    process::{Child, ChildStderr, ChildStdin, ChildStdout, Command},
59};
60use tokio_stream::{StreamExt, wrappers::LinesStream};
61
62use crate::{Config, Error, Result};
63
64mod request;
65pub use request::*;
66
67mod response;
68pub use response::*;
69
70/// Command line options for launching KataGo.
71#[derive(Debug, Clone)]
72pub struct LaunchOptions {
73    /// The path to the KataGo executable.
74    pub katago_path: String,
75
76    /// The path to the config file.
77    pub config_path: String,
78
79    /// The path to the model file.
80    pub model_path: String,
81
82    /// If true, KataGo's stderr output will be produced on the current process's stderr.
83    /// Otherwise, it will be made available as [`Engine::stderr`].
84    pub inherit_stderr: bool,
85
86    /// The path to the humanSL model file.
87    pub human_model_path: Option<String>,
88
89    /// Overrides to pass via `-override-config`.
90    pub override_config: Option<Config>,
91
92    /// If true, the engine will stop immediately when stdin is closed, instead of responding to pending requests.
93    pub quit_without_waiting: bool,
94}
95
96impl LaunchOptions {
97    /// Creates a new [`LaunchOptions`].
98    pub fn new(katago_path: String, config_path: String, model_path: String) -> Self {
99        Self {
100            katago_path,
101            config_path,
102            model_path,
103            inherit_stderr: false,
104            human_model_path: None,
105            override_config: None,
106            quit_without_waiting: false,
107        }
108    }
109
110    /// Causes KataGo's stderr output to be produced on the current process's stderr.
111    pub fn with_inherit_stderr(mut self) -> Self {
112        self.inherit_stderr = true;
113        self
114    }
115
116    /// Loads the humanSL model from the given path.
117    pub fn with_human_model(mut self, human_model_path: String) -> Self {
118        self.human_model_path = Some(human_model_path);
119        self
120    }
121
122    /// Passes the given options via `-override-config`.
123    pub fn with_override_config(mut self, config: Config) -> Self {
124        self.override_config = Some(config);
125        self
126    }
127
128    /// Stop immediately when stdin is closed, instead of responding to pending requests.
129    pub fn with_quit_without_waiting(mut self) -> Self {
130        self.quit_without_waiting = true;
131        self
132    }
133}
134
135/// An instance of the KataGo analysis engine, launched as a child process.
136#[derive(Debug)]
137pub struct Engine {
138    /// Sends requests to the analysis engine.
139    ///
140    /// Drop this to close the engine's stdin and request KataGo to exit.
141    pub stdin: EngineStdin,
142
143    /// A [`Stream`][futures_core::stream::Stream] of [`Response`]s from the analysis engine.
144    pub stdout: EngineStdout,
145
146    /// The analysis engine's stderr output, if available.
147    pub stderr: Option<ChildStderr>,
148
149    /// The engine process.
150    pub child_process: Child,
151}
152
153impl Engine {
154    /// Launches the KataGo analysis engine with the given options.
155    pub fn launch(config: &LaunchOptions) -> Result<Engine> {
156        let mut cmd = Command::new(&config.katago_path);
157        cmd.stdin(Stdio::piped())
158            .stdout(Stdio::piped())
159            .stderr(if config.inherit_stderr {
160                Stdio::inherit()
161            } else {
162                Stdio::piped()
163            })
164            .arg("analysis")
165            .arg("-config")
166            .arg(&config.config_path)
167            .arg("-model")
168            .arg(&config.model_path);
169
170        if let Some(human_model_path) = &config.human_model_path {
171            cmd.arg("-human-model").arg(human_model_path);
172        }
173
174        if let Some(override_config) = &config.override_config {
175            cmd.arg("-override-config").arg(
176                override_config
177                    .to_command_line_arg()
178                    .map_err(Error::UnserializableConfig)?,
179            );
180        }
181
182        if config.quit_without_waiting {
183            cmd.arg("-quit-without-waiting");
184        }
185
186        let mut child_process = cmd.spawn()?;
187        let stdin = child_process.stdin.take().ok_or(Error::StdinUnavailable)?;
188        let stdout = child_process
189            .stdout
190            .take()
191            .ok_or(Error::StdoutUnavailable)?;
192        let stdout_stream: EngineStdout = LinesStream::new(BufReader::new(stdout).lines())
193            .map(|line| Ok(serde_json::from_str::<Response>(&line?)?));
194
195        Ok(Engine {
196            stdin: EngineStdin(stdin),
197            stdout: stdout_stream,
198            stderr: child_process.stderr.take(),
199            child_process,
200        })
201    }
202}
203
204/// Sends requests to the analysis engine.
205///
206/// When dropped, this will close the engine's stdin and request KataGo to exit.
207#[derive(Debug)]
208pub struct EngineStdin(ChildStdin);
209
210impl EngineStdin {
211    /// Sends a [`Request`] to the analysis engine.
212    pub async fn send(&mut self, request: &Request) -> Result<()> {
213        let json = serde_json::to_string(request)?;
214        self.send_raw(&json).await
215    }
216
217    /// Sends a raw string to the analysis engine.
218    pub async fn send_raw(&mut self, request: &str) -> Result<()> {
219        self.0.write_all(request.as_bytes()).await?;
220        self.0.write_all(b"\n").await?;
221        Ok(())
222    }
223}
224
225/// A [`Stream`][futures_core::stream::Stream] of [`Response`]s from the analysis engine.
226pub type EngineStdout = tokio_stream::adapters::Map<
227    LinesStream<BufReader<ChildStdout>>,
228    fn(std::result::Result<String, io::Error>) -> Result<Response>,
229>;