behaviortree-core 0.1.0

Core implementaion of behaviortree
Documentation
// Copyright © 2026 Stephan Kunz
//! ZMTP 3.1 NULL-mechanism REP socket over raw TCP for 'std'.

use alloc::{vec, vec::Vec};

use std::io;

use bytes::Bytes;

use tokio::{
	io::{AsyncReadExt, AsyncWriteExt},
	net::{TcpListener, TcpStream},
};

use super::{FLAG_COMMAND, FLAG_LONG, FLAG_MORE, GREETING_SIZE, READY_BODY, build_greeting, message::Groot2Message};

// Bind only to localhost
const IP_ADDRESS: &str = "127.0.0.1";

/// Write a single ZMTP frame. Uses short (1-byte) length when `data.len() <= 255`,
/// long (8-byte BE) otherwise.
async fn write_frame<W: AsyncWriteExt + Unpin>(writer: &mut W, flags: u8, data: &[u8]) -> io::Result<()> {
	if data.len() > 255 {
		writer.write_u8(flags | FLAG_LONG).await?;
		writer.write_u64(data.len() as u64).await?;
	} else {
		writer.write_u8(flags).await?;
		writer.write_u8(data.len() as u8).await?;
	}
	writer.write_all(data).await
}

/// Read one ZMTP frame. Returns `(flags, data)`.
async fn read_frame<R: AsyncReadExt + Unpin>(reader: &mut R) -> io::Result<(u8, Vec<u8>)> {
	let flags = reader.read_u8().await?;
	let size = if flags & FLAG_LONG != 0 {
		reader.read_u64().await? as usize
	} else {
		reader.read_u8().await? as usize
	};
	let mut data = vec![0u8; size];
	reader.read_exact(&mut data).await?;
	Ok((flags, data))
}

/// Send the 64-byte ZMTP 3.1 greeting. Call immediately on connection (no waiting).
/// `is_server = true` for a REP socket.
async fn write_greeting<W: AsyncWriteExt + Unpin>(writer: &mut W, is_server: bool) -> io::Result<()> {
	let g = build_greeting(is_server);
	writer.write_all(&g).await
}

/// Read the peer's 64-byte greeting. Does not validate contents.
async fn read_greeting<R: AsyncReadExt + Unpin>(reader: &mut R) -> io::Result<[u8; GREETING_SIZE]> {
	let mut g = [0u8; GREETING_SIZE];
	reader.read_exact(&mut g).await?;
	Ok(g)
}

/// Send a ZMTP READY command frame advertising socket type REP.
async fn write_ready<W: AsyncWriteExt + Unpin>(writer: &mut W) -> io::Result<()> {
	write_frame(writer, FLAG_COMMAND, READY_BODY).await
}

/// Read and discard one COMMAND frame (the peer's READY).
/// Returns `InvalidData` if the frame does not carry the COMMAND flag.
async fn read_ready<R: AsyncReadExt + Unpin>(reader: &mut R) -> io::Result<()> {
	let (flags, _body) = read_frame(reader).await?;
	if flags & FLAG_COMMAND == 0 {
		return Err(io::Error::new(io::ErrorKind::InvalidData, "expected COMMAND frame for READY"));
	}
	Ok(())
}

/// ZMTP 3.1 NULL-mechanism REP socket.
///
/// Binds once. Each `recv()` accepts a new TCP connection when none is active,
/// runs the greeting + READY handshake, then reads one message (REP envelope stripped).
/// `send()` prepends the envelope. On any I/O error the connection is dropped so
/// the next `recv()` re-accepts.
pub struct Groot2Socket {
	listener: TcpListener,
	conn: Option<TcpStream>,
}

impl Groot2Socket {
	/// Bind to `IP_ADDRESS:port`. Pass `0` to let the OS pick a free port.
	pub async fn bind(port: u16) -> io::Result<Self> {
		let listener = TcpListener::bind((IP_ADDRESS, port)).await?;
		Ok(Self { listener, conn: None })
	}

	/// Returns the local address. Useful when `port = 0` was passed to `bind`.
	pub fn local_addr(&self) -> io::Result<std::net::SocketAddr> {
		self.listener.local_addr()
	}

	/// Accept a connection and run the ZMTP greeting + READY exchange.
	/// Writing 64 bytes never blocks on the kernel TCP buffer, so sequential
	/// write-then-read is safe for the NULL mechanism (no deadlock).
	async fn connect(&mut self) -> io::Result<()> {
		let (mut stream, _peer) = self.listener.accept().await?;
		write_greeting(&mut stream, true).await?;
		read_greeting(&mut stream).await?;
		write_ready(&mut stream).await?;
		read_ready(&mut stream).await?;
		self.conn = Some(stream);
		Ok(())
	}

	/// Receive one message. Accepts a new connection (with handshake) when none
	/// is active. Strips the leading empty REP envelope frame.
	pub async fn recv(&mut self) -> io::Result<Groot2Message> {
		if self.conn.is_none() {
			self.connect().await?;
		}
		let stream = self
			.conn
			.as_mut()
			.expect("conn is Some after connect");
		match Self::read_message(stream).await {
			Ok(msg) => Ok(msg),
			Err(e) => {
				self.conn = None;
				Err(e)
			}
		}
	}

	async fn read_message(stream: &mut TcpStream) -> io::Result<Groot2Message> {
		let mut frames: Vec<Bytes> = Vec::new();
		loop {
			let (flags, data) = read_frame(stream).await?;
			frames.push(Bytes::from(data));
			if flags & FLAG_MORE == 0 {
				break;
			}
		}
		// Strip the empty envelope delimiter that REQ always prepends.
		if frames
			.first()
			.map(|f| f.is_empty())
			.unwrap_or(false)
		{
			frames.remove(0);
		}
		Ok(Groot2Message(frames))
	}

	/// Send a reply. Prepends the empty REP envelope frame automatically.
	pub async fn send(&mut self, msg: Groot2Message) -> io::Result<()> {
		let stream = self
			.conn
			.as_mut()
			.ok_or_else(|| io::Error::new(io::ErrorKind::NotConnected, "no active ZMTP connection"))?;
		match Self::write_message(stream, msg).await {
			Ok(()) => Ok(()),
			Err(e) => {
				self.conn = None;
				Err(e)
			}
		}
	}

	async fn write_message(stream: &mut TcpStream, msg: Groot2Message) -> io::Result<()> {
		debug_assert!(!msg.0.is_empty(), "ZMTP send called with zero-frame message");
		// Prepend empty envelope delimiter with MORE flag.
		write_frame(stream, FLAG_MORE, &[]).await?;
		let frames = &msg.0;
		for (i, frame) in frames.iter().enumerate() {
			let flags = if i == frames.len() - 1 { 0x00 } else { FLAG_MORE };
			write_frame(stream, flags, frame).await?;
		}
		Ok(())
	}
}

#[test]
fn single_frame_message() {
	let bytes = Bytes::from_static(b"hello");
	let msg = Groot2Message::from(bytes.clone());
	assert_eq!(msg.get(0), Some(&bytes));
	assert_eq!(msg.get(1), None);
}

#[test]
fn push_back_adds_frames() {
	let b1 = Bytes::from_static(b"frame1");
	let b2 = Bytes::from_static(b"frame2");
	let mut msg = Groot2Message::from(b1.clone());
	msg.push_back(b2.clone());
	assert_eq!(msg.get(0), Some(&b1));
	assert_eq!(msg.get(1), Some(&b2));
	assert_eq!(msg.get(2), None);
}

#[tokio::test]
async fn frame_roundtrip_short() {
	let (mut w, mut r) = tokio::io::duplex(64);
	write_frame(&mut w, 0x00, b"hello").await.unwrap();
	let (flags, data) = read_frame(&mut r).await.unwrap();
	assert_eq!(flags, 0x00);
	assert_eq!(data, b"hello");
}

#[tokio::test]
async fn frame_roundtrip_with_more_flag() {
	let (mut w, mut r) = tokio::io::duplex(64);
	write_frame(&mut w, FLAG_MORE, b"part1")
		.await
		.unwrap();
	let (flags, data) = read_frame(&mut r).await.unwrap();
	assert_eq!(flags, FLAG_MORE);
	assert_eq!(data, b"part1");
}

#[tokio::test]
async fn frame_roundtrip_long() {
	// 256 bytes forces the FLAG_LONG (8-byte length) code path
	let (mut w, mut r) = tokio::io::duplex(4096);
	let payload = vec![0xab_u8; 256];
	write_frame(&mut w, FLAG_MORE, &payload)
		.await
		.unwrap();
	let (flags, data) = read_frame(&mut r).await.unwrap();
	assert_eq!(flags, FLAG_MORE | FLAG_LONG);
	assert_eq!(data, payload);
}

#[tokio::test]
async fn greeting_server_bytes_are_correct() {
	let (mut w, mut r) = tokio::io::duplex(128);
	write_greeting(&mut w, true).await.unwrap();
	let g = read_greeting(&mut r).await.unwrap();
	assert_eq!(g[0], 0xff, "signature[0]");
	assert_eq!(g[9], 0x7f, "signature[9]");
	assert_eq!(g[10], 0x03, "ZMTP major");
	assert_eq!(g[11], 0x01, "ZMTP minor");
	assert_eq!(&g[12..16], b"NULL", "mechanism");
	assert_eq!(g[32], 0x01, "as-server=true");
	assert_eq!(&g[33..64], &[0u8; 31], "filler is zero");
}

#[tokio::test]
async fn greeting_client_has_as_server_false() {
	let (mut w, mut r) = tokio::io::duplex(128);
	write_greeting(&mut w, false).await.unwrap();
	let g = read_greeting(&mut r).await.unwrap();
	assert_eq!(g[32], 0x00, "as-server=false");
}

#[tokio::test]
async fn ready_roundtrip() {
	let (mut w, mut r) = tokio::io::duplex(64);
	write_ready(&mut w).await.unwrap();
	read_ready(&mut r).await.unwrap(); // must not error
}

#[tokio::test]
async fn ready_rejects_non_command_frame() {
	let (mut w, mut r) = tokio::io::duplex(64);
	// Send a plain message frame (no COMMAND flag)
	write_frame(&mut w, 0x00, b"\x05READY")
		.await
		.unwrap();
	let err = read_ready(&mut r).await.unwrap_err();
	assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
}

#[tokio::test]
async fn rep_socket_full_roundtrip() {
	// Port 0 = OS picks a free port.
	let mut rep = Groot2Socket::bind(0).await.unwrap();
	let port = rep.local_addr().unwrap().port();

	// Spawn a minimal REQ-side client.
	let client = tokio::spawn(async move {
		let mut stream = TcpStream::connect(("127.0.0.1", port))
			.await
			.unwrap();

		// Client greeting (as-server = false)
		write_greeting(&mut stream, false).await.unwrap();
		read_greeting(&mut stream).await.unwrap();

		// Client READY — body content is ignored by server; COMMAND flag is all that matters.
		write_frame(&mut stream, FLAG_COMMAND, b"\x05READY")
			.await
			.unwrap();
		read_ready(&mut stream).await.unwrap();

		// Client sends [empty envelope, data frame]
		write_frame(&mut stream, FLAG_MORE, &[])
			.await
			.unwrap();
		write_frame(&mut stream, 0x00, b"ping")
			.await
			.unwrap();

		// Client receives [empty envelope, data frame]
		let (_f0, _empty) = read_frame(&mut stream).await.unwrap();
		let (_f1, reply) = read_frame(&mut stream).await.unwrap();
		reply
	});

	// Server receives "ping"
	let msg = rep.recv().await.unwrap();
	assert_eq!(msg.get(0).unwrap().as_ref(), b"ping");

	// Server replies "pong"
	rep.send(Groot2Message::from(Bytes::from_static(b"pong")))
		.await
		.unwrap();

	let reply_bytes = client.await.unwrap();
	assert_eq!(reply_bytes, b"pong");
}