extern crate std;
use core::time::Duration;
use alloc::collections::vec_deque::VecDeque;
use crate::{
ConstString, Mutex, XmlCreator,
behavior::{BehaviorState, behavior_data::BehaviorData},
tree::{
observer::groot2_protocol::{Groot2ReplyHeader, Groot2RequestHeader, Groot2RequestType, Groot2TransitionInfo},
tree::{BehaviorTree, BehaviorTreeMessage},
},
};
use alloc::string::{String, ToString};
use alloc::sync::Arc;
use bytes::{Bytes, BytesMut};
use thingbuf::mpsc;
use tokio::{task::JoinHandle, time::Instant};
use zeromq::{Socket, SocketRecv, SocketSend, ZmqMessage};
const TRANSITION_SIZE: u32 = 100;
pub const GROOT_STATE: &str = "groot_state";
pub fn attach_groot_callback(tree: &mut BehaviorTree, shared: Arc<Mutex<Groot2ConnectorData>>) {
let id: ConstString = GROOT_STATE.into();
let shared = shared;
for element in tree.iter_mut() {
let shared_clone = shared.clone();
let callback = move |behavior: &BehaviorData, new_state: &mut BehaviorState| {
if behavior.state() != *new_state {
if behavior.uid() != 0 {
let state = if *new_state == BehaviorState::Idle {
behavior.state() as u8 + 10
} else {
*new_state as u8
};
let mut shared_guard = shared_clone.lock();
let uid = behavior.uid().to_le_bytes();
let index = 3 * ((behavior.uid() - 1) as usize);
shared_guard.state_buffer[index] = uid[0];
shared_guard.state_buffer[index + 1] = uid[1];
shared_guard.state_buffer[index + 2] = state;
if shared_guard.recording {
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::expect_used)]
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_micros() as u64;
let info = Groot2TransitionInfo::new(timestamp, behavior.uid(), *new_state);
if shared_guard.transitions_buffer.is_empty() {
shared_guard.transitions = 0;
} else if shared_guard.transitions >= TRANSITION_SIZE {
shared_guard.transitions_buffer.pop_front();
} else {
shared_guard.transitions += 1;
}
shared_guard.transitions_buffer.push_back(info);
}
drop(shared_guard);
}
}
};
element.add_pre_state_change_callback(id.clone(), callback);
}
}
#[allow(dead_code)]
pub struct Groot2Connector {
tx: mpsc::Sender<BehaviorTreeMessage>,
shared: Arc<Mutex<Groot2ConnectorData>>,
server_handle: JoinHandle<Result<(), zeromq::ZmqError>>,
watchdog_handle: JoinHandle<()>,
}
pub struct Groot2ConnectorData {
connected: bool,
recording: bool,
transitions: u32,
state_buffer: BytesMut,
transitions_buffer: VecDeque<Groot2TransitionInfo>,
last_communication: Instant,
}
impl Groot2Connector {
#[must_use]
#[allow(clippy::too_many_lines)]
pub fn new(tree: &mut BehaviorTree, port: u16) -> Self {
let transitions_buffer = VecDeque::new();
let tree_size = tree.size() - 1; let mut state_buffer = BytesMut::zeroed((3 * tree_size) as usize);
for i in 0..tree_size {
let index = (3 * i) as usize;
let bytes = (i + 1).to_be_bytes();
state_buffer[index] = bytes[0];
state_buffer[index] = bytes[1];
}
let shared = Arc::new(Mutex::new(Groot2ConnectorData {
connected: false,
recording: false,
transitions: 0,
state_buffer,
transitions_buffer,
last_communication: Instant::now(),
}));
let shared_clone = shared.clone();
let sender = tree.sender();
let watchdog_handle = tokio::spawn(async move {
loop {
if let Some(mut data) = shared_clone.try_lock() {
#[allow(clippy::expect_used)]
if data.connected
&& Instant::now()
.checked_duration_since(data.last_communication)
.expect("time went backwards")
> Duration::from_secs(5)
{
let _ = sender
.send(BehaviorTreeMessage::RemoveAllGrootHooks)
.await;
data.connected = false;
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
let shared_clone = shared.clone();
let tree_id = tree.uuid();
#[allow(clippy::expect_used)]
let xml = XmlCreator::groot_write_tree(tree).expect("usually this should not happen");
let sender = tree.sender();
let server_handle = tokio::spawn(async move {
let server_address = String::from("tcp://0.0.0.0:") + &port.to_string();
let mut server_socket = zeromq::RepSocket::new();
server_socket.bind(&server_address).await?;
loop {
let request = server_socket.recv().await?;
shared_clone.lock().last_communication = Instant::now();
if let Some(bytes) = request.get(0) {
if let Ok(header) = Groot2RequestHeader::try_from(bytes) {
let rq_type = header.rq_type();
let reply_header = Groot2ReplyHeader::new(header, tree_id);
let mut reply = ZmqMessage::from(Bytes::from(&reply_header));
match rq_type {
Groot2RequestType::State => {
reply.push_back(shared_clone.lock().state_buffer.clone().into());
}
Groot2RequestType::FullTree => {
shared_clone.lock().connected = true;
let _ = sender
.send(BehaviorTreeMessage::AddGrootCallback(shared_clone.clone()))
.await;
reply.push_back(xml.clone());
}
Groot2RequestType::BlackBoard => {
std::dbg!(&request);
todo!()
}
Groot2RequestType::HookInsert => {
std::dbg!(&request);
todo!()
}
Groot2RequestType::HookRemove => {
std::dbg!(&request);
todo!()
}
Groot2RequestType::HooksDump => {
std::dbg!(&request);
todo!()
}
Groot2RequestType::RemoveAllHooks => {
shared_clone.lock().connected = false;
let _ = sender
.send(BehaviorTreeMessage::RemoveAllGrootHooks)
.await;
}
Groot2RequestType::DisableAllHooks => {
std::dbg!(&request);
todo!()
}
Groot2RequestType::BreakpointReached => {
std::dbg!(&request);
todo!()
}
Groot2RequestType::BreakpointUnlock => {
std::dbg!(&request);
todo!()
}
Groot2RequestType::ToggleRecording => {
if let Some(command) = request.get(1) {
let cmd = command.to_vec();
match &cmd[..] {
b"start" => {
let mut shared_guard = shared_clone.lock();
shared_guard.recording = true;
shared_guard.transitions_buffer.clear();
shared_guard
.transitions_buffer
.reserve(TRANSITION_SIZE as usize);
drop(shared_guard);
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::expect_used)]
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_micros() as u64;
reply.push_back(Bytes::from(timestamp.to_string()));
}
b"stop" => {
shared_clone.lock().recording = false;
}
_ => {
std::dbg!(&command);
todo!()
}
}
} else {
todo!()
}
}
Groot2RequestType::GetTransitions => {
let mut bytes = BytesMut::with_capacity((TRANSITION_SIZE * 9) as usize);
let mut shared_guard = shared_clone.lock();
for info in &shared_guard.transitions_buffer {
bytes.extend(Bytes::from(info));
}
reply.push_back(Bytes::from(bytes));
shared_guard.transitions_buffer.clear();
}
Groot2RequestType::Undefined => {
std::dbg!(&request);
todo!()
}
}
server_socket.send(reply).await?;
} else {
std::dbg!(&request);
todo!()
}
} else {
todo!()
}
}
});
Self {
tx: tree.sender(),
shared,
server_handle,
watchdog_handle,
}
}
}