use crate::metrics::SwanlingMetrics;
use crate::util;
use crate::{
AttackPhase, SwanlingAttack, SwanlingAttackRunState, SwanlingConfiguration, SwanlingError,
};
use async_trait::async_trait;
use futures::{SinkExt, StreamExt};
use regex::{Regex, RegexSet};
use serde::{Deserialize, Serialize};
use std::io;
use std::str;
use std::str::FromStr;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tungstenite::Message;
#[derive(Clone, Debug)]
pub(crate) enum SwanlingControllerProtocol {
Telnet,
WebSocket,
}
#[derive(Clone, Debug, PartialEq)]
pub enum SwanlingControllerCommand {
Host,
Users,
HatchRate,
RunTime,
Config,
ConfigJson,
Metrics,
MetricsJson,
Help,
Exit,
Start,
Stop,
Shutdown,
}
#[derive(Debug)]
pub(crate) struct SwanlingControllerRequestMessage {
pub command: SwanlingControllerCommand,
pub value: Option<String>,
}
#[derive(Debug)]
pub(crate) enum SwanlingControllerResponseMessage {
Bool(bool),
Config(Box<SwanlingConfiguration>),
Metrics(Box<SwanlingMetrics>),
}
#[derive(Debug)]
pub(crate) struct SwanlingControllerRequest {
pub response_channel: Option<tokio::sync::oneshot::Sender<SwanlingControllerResponse>>,
pub client_id: u32,
pub request: SwanlingControllerRequestMessage,
}
#[derive(Debug)]
pub(crate) struct SwanlingControllerResponse {
pub client_id: u32,
pub response: SwanlingControllerResponseMessage,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct SwanlingControllerWebSocketRequest {
pub request: String,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct SwanlingControllerWebSocketResponse {
pub response: String,
pub success: bool,
}
type SwanlingControllerExit = bool;
type SwanlingControllerTelnetMessage = [u8; 1024];
type SwanlingControllerWebSocketMessage =
std::result::Result<tungstenite::Message, tungstenite::Error>;
type SwanlingControllerWebSocketSender = futures::stream::SplitSink<
tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
tungstenite::Message,
>;
pub(crate) struct SwanlingControllerState {
thread_id: u32,
peer_address: String,
channel_tx: flume::Sender<SwanlingControllerRequest>,
commands: RegexSet,
captures: Vec<Regex>,
protocol: SwanlingControllerProtocol,
}
impl SwanlingControllerState {
async fn accept_connections(self, mut socket: tokio::net::TcpStream) {
info!(
"{:?} client [{}] connected from {}",
self.protocol, self.thread_id, self.peer_address
);
match self.protocol {
SwanlingControllerProtocol::Telnet => {
let mut buf: SwanlingControllerTelnetMessage = [0; 1024];
write_to_socket_raw(&mut socket, "swanling> ").await;
loop {
let n = match socket.read(&mut buf).await {
Ok(data) => data,
Err(_) => {
info!(
"Telnet client [{}] disconnected from {}",
self.thread_id, self.peer_address
);
break;
}
};
if n == 0 {
info!(
"Telnet client [{}] disconnected from {}",
self.thread_id, self.peer_address
);
break;
}
if let Ok(command_string) = self.get_command_string(buf).await {
if let Ok(request_message) = self.get_match(&command_string.trim()).await {
if self.execute_command(&mut socket, request_message).await {
info!(
"Telnet client [{}] disconnected from {}",
self.thread_id, self.peer_address
);
break;
}
} else {
self.write_to_socket(
&mut socket,
Err("unrecognized command".to_string()),
)
.await;
}
} else {
info!(
"Telnet client [{}] disconnected from {}",
self.thread_id, self.peer_address
);
break;
}
}
}
SwanlingControllerProtocol::WebSocket => {
let stream = match tokio_tungstenite::accept_async(socket).await {
Ok(s) => s,
Err(e) => {
info!("invalid WebSocket handshake: {}", e);
return;
}
};
let (mut ws_sender, mut ws_receiver) = stream.split();
loop {
let data = match ws_receiver.next().await {
Some(d) => (d),
None => {
info!(
"Telnet client [{}] disconnected from {}",
self.thread_id, self.peer_address
);
break;
}
};
if let Ok(command_string) = self.get_command_string(data).await {
if let Ok(request_message) = self.get_match(&command_string.trim()).await {
if self.execute_command(&mut ws_sender, request_message).await {
info!(
"Telnet client [{}] disconnected from {}",
self.thread_id, self.peer_address
);
break;
}
} else {
self.write_to_socket(
&mut ws_sender,
Err("unrecognized command, see Swanling README.md".to_string()),
)
.await;
}
} else {
self.write_to_socket(
&mut ws_sender,
Err("unable to parse json, see Swanling README.md".to_string()),
)
.await;
}
}
}
}
}
async fn get_match(
&self,
command_string: &str,
) -> Result<SwanlingControllerRequestMessage, ()> {
let matches = self.commands.matches(&command_string);
if matches.matched(SwanlingControllerCommand::Help as usize) {
Ok(SwanlingControllerRequestMessage {
command: SwanlingControllerCommand::Help,
value: None,
})
} else if matches.matched(SwanlingControllerCommand::Exit as usize) {
Ok(SwanlingControllerRequestMessage {
command: SwanlingControllerCommand::Exit,
value: None,
})
} else if matches.matched(SwanlingControllerCommand::Start as usize) {
Ok(SwanlingControllerRequestMessage {
command: SwanlingControllerCommand::Start,
value: None,
})
} else if matches.matched(SwanlingControllerCommand::Stop as usize) {
Ok(SwanlingControllerRequestMessage {
command: SwanlingControllerCommand::Stop,
value: None,
})
} else if matches.matched(SwanlingControllerCommand::Shutdown as usize) {
Ok(SwanlingControllerRequestMessage {
command: SwanlingControllerCommand::Shutdown,
value: None,
})
} else if matches.matched(SwanlingControllerCommand::Config as usize) {
Ok(SwanlingControllerRequestMessage {
command: SwanlingControllerCommand::Config,
value: None,
})
} else if matches.matched(SwanlingControllerCommand::ConfigJson as usize) {
Ok(SwanlingControllerRequestMessage {
command: SwanlingControllerCommand::ConfigJson,
value: None,
})
} else if matches.matched(SwanlingControllerCommand::Metrics as usize) {
Ok(SwanlingControllerRequestMessage {
command: SwanlingControllerCommand::Metrics,
value: None,
})
} else if matches.matched(SwanlingControllerCommand::MetricsJson as usize) {
Ok(SwanlingControllerRequestMessage {
command: SwanlingControllerCommand::MetricsJson,
value: None,
})
} else if matches.matched(SwanlingControllerCommand::Host as usize) {
let caps = self.captures[SwanlingControllerCommand::Host as usize]
.captures(command_string)
.unwrap();
let host = caps.get(2).map_or("", |m| m.as_str());
if util::is_valid_host(host).is_ok() {
Ok(SwanlingControllerRequestMessage {
command: SwanlingControllerCommand::Host,
value: Some(host.to_string()),
})
} else {
debug!("invalid host: {}", host);
Err(())
}
} else if matches.matched(SwanlingControllerCommand::Users as usize) {
let caps = self.captures[SwanlingControllerCommand::Users as usize]
.captures(command_string)
.unwrap();
let users = caps.get(2).map_or("", |m| m.as_str());
Ok(SwanlingControllerRequestMessage {
command: SwanlingControllerCommand::Users,
value: Some(users.to_string()),
})
} else if matches.matched(SwanlingControllerCommand::HatchRate as usize) {
let caps = self.captures[SwanlingControllerCommand::HatchRate as usize]
.captures(command_string)
.unwrap();
let hatch_rate = caps.get(2).map_or("", |m| m.as_str());
Ok(SwanlingControllerRequestMessage {
command: SwanlingControllerCommand::HatchRate,
value: Some(hatch_rate.to_string()),
})
} else if matches.matched(SwanlingControllerCommand::RunTime as usize) {
let caps = self.captures[SwanlingControllerCommand::RunTime as usize]
.captures(command_string)
.unwrap();
let run_time = caps.get(2).map_or("", |m| m.as_str());
Ok(SwanlingControllerRequestMessage {
command: SwanlingControllerCommand::RunTime,
value: Some(run_time.to_string()),
})
} else {
Err(())
}
}
fn process_local_command(
&self,
request_message: &SwanlingControllerRequestMessage,
) -> Option<String> {
match request_message.command {
SwanlingControllerCommand::Help => Some(display_help()),
SwanlingControllerCommand::Exit => Some("goodbye!".to_string()),
_ => None,
}
}
async fn process_command(
&self,
request: SwanlingControllerRequestMessage,
) -> Result<SwanlingControllerResponseMessage, String> {
let (response_tx, response_rx): (
tokio::sync::oneshot::Sender<SwanlingControllerResponse>,
tokio::sync::oneshot::Receiver<SwanlingControllerResponse>,
) = tokio::sync::oneshot::channel();
if self
.channel_tx
.try_send(SwanlingControllerRequest {
response_channel: Some(response_tx),
client_id: self.thread_id,
request,
})
.is_err()
{
return Err("parent process has closed the controller channel".to_string());
}
match response_rx.await {
Ok(value) => Ok(value.response),
Err(e) => Err(format!("one-shot channel dropped without reply: {}", e)),
}
}
fn process_response(
&self,
command: SwanlingControllerCommand,
response: SwanlingControllerResponseMessage,
) -> Result<String, String> {
match command {
SwanlingControllerCommand::Host => {
if let SwanlingControllerResponseMessage::Bool(true) = response {
Ok("host configured".to_string())
} else {
Err(
"failed to reconfigure host, be sure host is valid and load test is idle"
.to_string(),
)
}
}
SwanlingControllerCommand::Users => {
if let SwanlingControllerResponseMessage::Bool(true) = response {
Ok("users configured".to_string())
} else {
Err("load test not idle, failed to reconfigure users".to_string())
}
}
SwanlingControllerCommand::HatchRate => {
if let SwanlingControllerResponseMessage::Bool(true) = response {
Ok("hatch_rate configured".to_string())
} else {
Err("failed to configure hatch_rate".to_string())
}
}
SwanlingControllerCommand::RunTime => {
if let SwanlingControllerResponseMessage::Bool(true) = response {
Ok("run_time configured".to_string())
} else {
Err("failed to configure run_time".to_string())
}
}
SwanlingControllerCommand::Config => {
if let SwanlingControllerResponseMessage::Config(config) = response {
Ok(format!("{:#?}", config))
} else {
Err("error loading configuration".to_string())
}
}
SwanlingControllerCommand::ConfigJson => {
if let SwanlingControllerResponseMessage::Config(config) = response {
Ok(serde_json::to_string(&config).expect("unexpected serde failure"))
} else {
Err("error loading configuration".to_string())
}
}
SwanlingControllerCommand::Metrics => {
if let SwanlingControllerResponseMessage::Metrics(metrics) = response {
Ok(metrics.to_string())
} else {
Err("error loading metrics".to_string())
}
}
SwanlingControllerCommand::MetricsJson => {
if let SwanlingControllerResponseMessage::Metrics(metrics) = response {
Ok(serde_json::to_string(&metrics).expect("unexpected serde failure"))
} else {
Err("error loading metrics".to_string())
}
}
SwanlingControllerCommand::Start => {
if let SwanlingControllerResponseMessage::Bool(true) = response {
Ok("load test started".to_string())
} else {
Err(
"unable to start load test, be sure it is idle and host is configured"
.to_string(),
)
}
}
SwanlingControllerCommand::Stop => {
if let SwanlingControllerResponseMessage::Bool(true) = response {
Ok("load test stopped".to_string())
} else {
Err("load test not running, failed to stop".to_string())
}
}
SwanlingControllerCommand::Shutdown => {
if let SwanlingControllerResponseMessage::Bool(true) = response {
Ok("load test shut down".to_string())
} else {
Err("failed to shut down load test".to_string())
}
}
SwanlingControllerCommand::Help | SwanlingControllerCommand::Exit => {
let e = "received an impossible HELP or EXIT command";
error!("{}", e);
Err(e.to_string())
}
}
}
}
#[async_trait]
trait SwanlingController<T> {
async fn get_command_string(&self, raw_value: T) -> Result<String, String>;
}
#[async_trait]
impl SwanlingController<SwanlingControllerTelnetMessage> for SwanlingControllerState {
async fn get_command_string(
&self,
raw_value: SwanlingControllerTelnetMessage,
) -> Result<String, String> {
let command_string = match str::from_utf8(&raw_value) {
Ok(m) => {
if let Some(c) = m.lines().next() {
c
} else {
""
}
}
Err(e) => {
let error = format!("ignoring unexpected input from telnet controller: {}", e);
info!("{}", error);
return Err(error);
}
};
Ok(command_string.to_string())
}
}
#[async_trait]
impl SwanlingController<SwanlingControllerWebSocketMessage> for SwanlingControllerState {
async fn get_command_string(
&self,
raw_value: SwanlingControllerWebSocketMessage,
) -> Result<String, String> {
if let Ok(request) = raw_value {
if request.is_text() {
if let Ok(request) = request.into_text() {
debug!("websocket request: {:?}", request.trim());
let command_string: SwanlingControllerWebSocketRequest =
match serde_json::from_str(&request) {
Ok(c) => c,
Err(_) => {
return Err(
"unrecognized json request, refer to Swanling README.md"
.to_string(),
)
}
};
return Ok(command_string.request);
} else {
return Err("unsupported string format".to_string());
}
} else {
return Err("unsupported format, requests must be sent as text".to_string());
}
}
Err("WebSocket handshake error".to_string())
}
}
#[async_trait]
trait SwanlingControllerExecuteCommand<T> {
async fn execute_command(
&self,
socket: &mut T,
request_message: SwanlingControllerRequestMessage,
) -> SwanlingControllerExit;
async fn write_to_socket(&self, socket: &mut T, response_message: Result<String, String>);
}
#[async_trait]
impl SwanlingControllerExecuteCommand<tokio::net::TcpStream> for SwanlingControllerState {
async fn execute_command(
&self,
socket: &mut tokio::net::TcpStream,
request_message: SwanlingControllerRequestMessage,
) -> SwanlingControllerExit {
if let Some(message) = self.process_local_command(&request_message) {
self.write_to_socket(socket, Ok(message)).await;
return request_message.command == SwanlingControllerCommand::Exit;
}
let command = request_message.command.clone();
let response = match self.process_command(request_message).await {
Ok(r) => r,
Err(e) => {
self.write_to_socket(socket, Err(e)).await;
return true;
}
};
let exit_controller = command == SwanlingControllerCommand::Shutdown;
self.write_to_socket(socket, self.process_response(command, response))
.await;
exit_controller
}
async fn write_to_socket(
&self,
socket: &mut tokio::net::TcpStream,
message: Result<String, String>,
) {
let response_message = match message {
Ok(m) => m,
Err(e) => e,
};
if socket
.write_all([&response_message, "\nswanling> "].concat().as_bytes())
.await
.is_err()
{
warn!("failed to write data to socker");
};
}
}
#[async_trait]
impl SwanlingControllerExecuteCommand<SwanlingControllerWebSocketSender>
for SwanlingControllerState
{
async fn execute_command(
&self,
socket: &mut SwanlingControllerWebSocketSender,
request_message: SwanlingControllerRequestMessage,
) -> SwanlingControllerExit {
if let Some(message) = self.process_local_command(&request_message) {
self.write_to_socket(socket, Ok(message)).await;
let exit_controller = request_message.command == SwanlingControllerCommand::Exit;
if exit_controller
&& socket
.send(Message::Close(Some(tungstenite::protocol::CloseFrame {
code: tungstenite::protocol::frame::coding::CloseCode::Normal,
reason: std::borrow::Cow::Borrowed("exit"),
})))
.await
.is_err()
{
warn!("failed to write data to stream");
}
return exit_controller;
}
let command = match request_message.command {
SwanlingControllerCommand::Config => SwanlingControllerCommand::ConfigJson,
SwanlingControllerCommand::Metrics => SwanlingControllerCommand::MetricsJson,
_ => request_message.command.clone(),
};
let response = match self.process_command(request_message).await {
Ok(r) => r,
Err(e) => {
self.write_to_socket(socket, Err(e)).await;
return true;
}
};
let exit_controller = command == SwanlingControllerCommand::Shutdown;
self.write_to_socket(socket, self.process_response(command, response))
.await;
if exit_controller
&& socket
.send(Message::Close(Some(tungstenite::protocol::CloseFrame {
code: tungstenite::protocol::frame::coding::CloseCode::Normal,
reason: std::borrow::Cow::Borrowed("shutdown"),
})))
.await
.is_err()
{
warn!("failed to write data to stream");
}
exit_controller
}
async fn write_to_socket(
&self,
socket: &mut SwanlingControllerWebSocketSender,
response_result: Result<String, String>,
) {
let success;
let response = match response_result {
Ok(m) => {
success = true;
m
}
Err(e) => {
success = false;
e
}
};
if let Err(e) = socket
.send(Message::Text(
match serde_json::to_string(&SwanlingControllerWebSocketResponse {
response,
success,
}) {
Ok(json) => json,
Err(e) => {
warn!("failed to json encode response: {}", e);
return;
}
},
))
.await
{
info!("failed to write data to websocket: {}", e);
}
}
}
pub(crate) async fn controller_main(
configuration: SwanlingConfiguration,
channel_tx: flume::Sender<SwanlingControllerRequest>,
protocol: SwanlingControllerProtocol,
) -> io::Result<()> {
let address = match &protocol {
SwanlingControllerProtocol::Telnet => format!(
"{}:{}",
configuration.telnet_host, configuration.telnet_port
),
SwanlingControllerProtocol::WebSocket => format!(
"{}:{}",
configuration.websocket_host, configuration.websocket_port
),
};
debug!(
"preparing to bind {:?} controller to: {}",
protocol, address
);
let listener = TcpListener::bind(&address).await?;
info!("{:?} controller listening on: {}", protocol, address);
let host_regex = r"(?i)^(host|hostname|host_name|host-name) ((https?)://.+)$";
let users_regex = r"(?i)^(users?) (\d+)$";
let hatchrate_regex = r"(?i)^(hatchrate|hatch_rate|hatch-rate) ([0-9]*(\.[0-9]*)?){1}$";
let runtime_regex =
r"(?i)^(run|runtime|run_time|run-time|) (\d+|((\d+?)h)?((\d+?)m)?((\d+?)s)?)$";
let commands = RegexSet::new(&[
host_regex,
users_regex,
hatchrate_regex,
runtime_regex,
r"(?i)^config$",
r"(?i)^(configjson|config-json)$",
r"(?i)^(metrics|stats)$",
r"(?i)^(metricsjson|metrics-json|statsjson|stats-json)$",
r"(?i)^(help|\?)$",
r"(?i)^(exit|quit)$",
r"(?i)^start$",
r"(?i)^stop$",
r"(?i)^shutdown$",
])
.unwrap();
let captures = vec![
Regex::new(host_regex).unwrap(),
Regex::new(users_regex).unwrap(),
Regex::new(hatchrate_regex).unwrap(),
Regex::new(runtime_regex).unwrap(),
];
let mut thread_id: u32 = 0;
while let Ok((stream, _)) = listener.accept().await {
thread_id += 1;
let peer_address = stream
.peer_addr()
.map_or("UNKNOWN ADDRESS".to_string(), |p| p.to_string());
let controller_state = SwanlingControllerState {
thread_id,
peer_address,
channel_tx: channel_tx.clone(),
commands: commands.clone(),
captures: captures.clone(),
protocol: protocol.clone(),
};
let _ = tokio::spawn(controller_state.accept_connections(stream));
}
Ok(())
}
async fn write_to_socket_raw(socket: &mut tokio::net::TcpStream, message: &str) {
if socket
.write_all(message.as_bytes())
.await
.is_err()
{
warn!("failed to write data to socket");
}
}
fn display_help() -> String {
format!(
r"{} {} controller commands:
help (?) this help
exit (quit) exit controller
start start an idle load test
stop stop a running load test and return to idle state
shutdown shutdown running load test (and exit controller)
host HOST set host to load test, ie http://localhost/
users INT set number of simulated users
hatchrate FLOAT set per-second rate users hatch
runtime TIME set how long to run test, ie 1h30m5s
config display load test configuration
config-json display load test configuration in json format
metrics display metrics for current load test
metrics-json display metrics for current load test in json format",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION")
)
}
impl SwanlingAttack {
pub(crate) fn reply_to_controller(
&mut self,
request: SwanlingControllerRequest,
response: SwanlingControllerResponseMessage,
) {
if let Some(oneshot_tx) = request.response_channel {
if oneshot_tx
.send(SwanlingControllerResponse {
client_id: request.client_id,
response,
})
.is_err()
{
warn!("failed to send response to controller via one-shot channel")
}
}
}
pub(crate) async fn handle_controller_requests(
&mut self,
swanling_attack_run_state: &mut SwanlingAttackRunState,
) -> Result<(), SwanlingError> {
if let Some(c) = swanling_attack_run_state.controller_channel_rx.as_ref() {
match c.try_recv() {
Ok(message) => {
info!(
"request from controller client {}: {:?}",
message.client_id, message.request
);
match &message.request.command {
SwanlingControllerCommand::Config
| SwanlingControllerCommand::ConfigJson => {
self.reply_to_controller(
message,
SwanlingControllerResponseMessage::Config(Box::new(
self.configuration.clone(),
)),
);
}
SwanlingControllerCommand::Metrics
| SwanlingControllerCommand::MetricsJson => {
self.reply_to_controller(
message,
SwanlingControllerResponseMessage::Metrics(Box::new(
self.metrics.clone(),
)),
);
}
SwanlingControllerCommand::Start => {
if self.attack_phase == AttackPhase::Idle {
if self.prepare_load_test().is_ok() {
self.set_attack_phase(
swanling_attack_run_state,
AttackPhase::Starting,
);
self.reply_to_controller(
message,
SwanlingControllerResponseMessage::Bool(true),
);
self.reset_run_state(swanling_attack_run_state).await?;
} else {
self.reply_to_controller(
message,
SwanlingControllerResponseMessage::Bool(false),
);
}
} else {
self.reply_to_controller(
message,
SwanlingControllerResponseMessage::Bool(false),
);
}
}
SwanlingControllerCommand::Stop => {
if [AttackPhase::Starting, AttackPhase::Running]
.contains(&self.attack_phase)
{
self.set_attack_phase(
swanling_attack_run_state,
AttackPhase::Stopping,
);
swanling_attack_run_state.shutdown_after_stop = false;
self.configuration.no_autostart = true;
self.reply_to_controller(
message,
SwanlingControllerResponseMessage::Bool(true),
);
} else {
self.reply_to_controller(
message,
SwanlingControllerResponseMessage::Bool(false),
);
}
}
SwanlingControllerCommand::Shutdown => {
if self.attack_phase == AttackPhase::Idle {
self.metrics.display_metrics = false;
}
swanling_attack_run_state.shutdown_after_stop = true;
self.set_attack_phase(swanling_attack_run_state, AttackPhase::Stopping);
self.reply_to_controller(
message,
SwanlingControllerResponseMessage::Bool(true),
);
}
SwanlingControllerCommand::Host => {
if self.attack_phase == AttackPhase::Idle {
if let Some(host) = &message.request.value {
info!(
"changing host from {:?} to {}",
self.configuration.host, host
);
self.configuration.host = host.to_string();
self.reply_to_controller(
message,
SwanlingControllerResponseMessage::Bool(true),
);
} else {
warn!(
"Controller didn't provide host: {:#?}",
&message.request
);
}
} else {
self.reply_to_controller(
message,
SwanlingControllerResponseMessage::Bool(false),
);
}
}
SwanlingControllerCommand::Users => {
if self.attack_phase == AttackPhase::Idle {
if let Some(users) = &message.request.value {
info!(
"changing users from {:?} to {}",
self.configuration.users, users
);
self.configuration.users = Some(
usize::from_str(&users)
.expect("failed to convert string to usize"),
);
self.reply_to_controller(
message,
SwanlingControllerResponseMessage::Bool(true),
);
} else {
warn!(
"Controller didn't provide users: {:#?}",
&message.request
);
}
} else {
self.reply_to_controller(
message,
SwanlingControllerResponseMessage::Bool(false),
);
}
}
SwanlingControllerCommand::HatchRate => {
if let Some(hatch_rate) = &message.request.value {
info!(
"changing hatch_rate from {:?} to {}",
self.configuration.hatch_rate, hatch_rate
);
self.configuration.hatch_rate = Some(hatch_rate.clone());
self.reply_to_controller(
message,
SwanlingControllerResponseMessage::Bool(true),
);
} else {
warn!(
"Controller didn't provide hatch_rate: {:#?}",
&message.request
);
}
}
SwanlingControllerCommand::RunTime => {
if let Some(run_time) = &message.request.value {
info!(
"changing run_time from {:?} to {}",
self.configuration.run_time, run_time
);
self.configuration.run_time = run_time.clone();
self.set_run_time()?;
self.reply_to_controller(
message,
SwanlingControllerResponseMessage::Bool(true),
);
} else {
warn!(
"Controller didn't provide run_time: {:#?}",
&message.request
);
}
}
SwanlingControllerCommand::Help | SwanlingControllerCommand::Exit => {
warn!("Unexpected command: {:?}", &message.request);
}
}
}
Err(e) => {
debug!("error receiving message: {}", e);
}
}
};
Ok(())
}
}