use anyhow::Result;
use futures::{SinkExt, StreamExt};
use spop::{
SpopCodec, SpopFrame, Version,
actions::VarScope,
frame::{FramePayload, FrameType},
frames::{Ack, AgentDisconnect, AgentHello, FrameCapabilities, HaproxyHello},
};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::codec::Framed;
#[tokio::main]
async fn main() -> Result<()> {
let listener = TcpListener::bind("0.0.0.0:12345").await?;
println!("SPOE Agent listening on port 12345...");
loop {
let (stream, addr) = listener.accept().await?;
println!("New connection from {addr}");
tokio::spawn(handle_connection(stream));
}
}
async fn handle_connection(u_stream: TcpStream) -> Result<()> {
let mut socket = Framed::new(u_stream, SpopCodec);
while let Some(result) = socket.next().await {
let frame = match result {
Ok(f) => f,
Err(e) => {
eprintln!("Frame read error: {e:?}");
break;
}
};
match frame.frame_type() {
FrameType::HaproxyHello => {
let hello = HaproxyHello::try_from(frame.payload())
.map_err(|_| anyhow::anyhow!("Failed to parse HaproxyHello"))?;
let max_frame_size = hello.max_frame_size;
let is_healthcheck = hello.healthcheck.unwrap_or(false);
let version = Version::parse("2.0.0")?;
let agent_hello = AgentHello {
version,
max_frame_size,
capabilities: vec![FrameCapabilities::Pipelining],
};
println!("Sending AgentHello: {:#?}", agent_hello.payload());
match socket.send(Box::new(agent_hello)).await {
Ok(()) => println!("Frame sent successfully"),
Err(e) => eprintln!("Failed to send frame: {e:?}"),
}
if is_healthcheck {
println!("Handled healthcheck. Closing socket.");
return Ok(());
}
}
FrameType::HaproxyDisconnect => {
let agent_disconnect = AgentDisconnect {
status_code: 0,
message: "Goodbye".to_string(),
};
println!("Sending AgentDisconnect: {:#?}", agent_disconnect.payload());
socket.send(Box::new(agent_disconnect)).await?;
socket.close().await?;
return Ok(());
}
FrameType::Notify => {
if let FramePayload::ListOfMessages(messages) = &frame.payload() {
let mut ack = Ack::new(frame.metadata().stream_id, frame.metadata().frame_id);
for message in *messages {
match message.name.as_str() {
"check-client-ip" => {
let random_value: u32 = rand::random_range(0..100);
ack = ack.set_var(VarScope::Session, "ip_score", random_value);
}
"log-request" => {
ack = ack.set_var(VarScope::Transaction, "my_var", "tequila");
}
_ => {
eprintln!("Unsupported message: {:?}", message.name);
}
}
}
println!("Sending Ack: {:#?}", ack.payload());
socket.send(Box::new(ack)).await?;
}
}
_ => {
eprintln!("Unsupported frame type: {:?}", frame.frame_type());
}
}
}
println!("Socket closed by peer");
Ok(())
}