use tokio::net::UdpSocket;
use std::net::SocketAddr;
use serde_json;
use log;
use anyhow::anyhow;
use super::types::{CoreLinkMessage, CoreLinkMessageType, CoreLinkSubscriptionPayload, CoreLinkTopicValuePayload};
pub struct UdpClient {
socket: UdpSocket,
broker_addr: SocketAddr,
last_transaction_id: u32,
node_id: String,
}
impl UdpClient {
pub async fn new(node_id: &str, bind_addr: &str, broker_addr: &str) -> Result<Self, anyhow::Error> {
let socket = UdpSocket::bind(bind_addr).await?;
let broker_addr: SocketAddr = broker_addr.parse()?;
if node_id.len() < 2 {
return Err(anyhow!("Invalid node id provided: {}", node_id));
}
Ok(Self {
socket,
broker_addr,
last_transaction_id: 0,
node_id: node_id.to_lowercase(),
})
}
pub fn node_id(&self) -> &str {
return self.node_id.as_str();
}
fn increment_transaction_id(&mut self) -> u32 {
self.last_transaction_id += 1;
if self.last_transaction_id == 0 {
self.last_transaction_id = 1;
}
return self.last_transaction_id;
}
pub async fn send_message(&self, message: &mut CoreLinkMessage) -> Result<CoreLinkMessage, anyhow::Error> {
CoreLinkMessage::calculate_crc32(&message);
let msg_str = serde_json::to_string(&message)?;
self.socket.send_to(msg_str.as_bytes(), self.broker_addr).await?;
let mut buf = [0; 1024];
let (len, _) = self.socket.recv_from(&mut buf).await?;
let response_str = String::from_utf8(buf[..len].to_vec())?;
let response: CoreLinkMessage = serde_json::from_str(&response_str)?;
Ok(response)
}
pub async fn register(&mut self) -> Result<(), anyhow::Error> {
let mut message = CoreLinkMessage {
transaction_id: self.increment_transaction_id(),
timecode: 0,
node_id: self.node_id.clone(),
message_type: CoreLinkMessageType::Register,
crc: 0,
data: serde_json::Value::Null,
};
match self.send_message(&mut message).await {
Ok(res) => {
println!("** {:?} ** ", res);
if res.is_error() {
return Err(anyhow!("Failed to register node: {}", res.error_message()));
} else {
return Ok(());
}
}
Err(err) => {
return Err(anyhow!("Failed to send register request: {}", err));
}
}
}
pub async fn subscribe(
&mut self,
fqdn: &str,
options: &serde_json::Value,
user_data: &serde_json::Value,
) -> Result<(), anyhow::Error> {
let mut payload = CoreLinkSubscriptionPayload::new(fqdn);
payload.options = options.clone();
payload.user_data = user_data.clone();
match serde_json::to_value(&payload) {
Ok(data) => {
let mut message = CoreLinkMessage {
transaction_id: self.increment_transaction_id(),
timecode: 0,
node_id: self.node_id.clone(),
message_type: CoreLinkMessageType::Subscribe,
crc: 0,
data: data,
};
match self.send_message(&mut message).await {
Ok(res) => {
if let Some(err) = res.data.get("error_message") {
return Err(anyhow!("Failed to subscribe to value: {}", err.to_string()));
} else {
return Ok(());
}
}
Err(err) => {
return Err(anyhow!("Failed to send subscribe request: {}", err));
}
}
}
Err(err) => {
return Err(anyhow!("Failed to generate SUBSCRIBE message: {}", err));
}
}
}
pub async fn write(
&mut self,
fqdn: &str,
value: &serde_json::Value,
user_data: &serde_json::Value,
) -> Result<(), anyhow::Error> {
let mut payload = CoreLinkTopicValuePayload::new(fqdn);
payload.value = value.clone();
payload.user_data = user_data.clone();
match serde_json::to_value(&payload) {
Ok(data) => {
let mut message = CoreLinkMessage {
transaction_id: self.increment_transaction_id(),
timecode: 0,
node_id: self.node_id.clone(),
message_type: CoreLinkMessageType::Write,
crc: 0,
data: data,
};
match self.send_message(&mut message).await {
Ok(res) => {
if let Some(err) = res.data.get("error_message") {
return Err(anyhow!("Failed to write value: {}", err.to_string()));
} else {
return Ok(());
}
}
Err(err) => {
return Err(anyhow!("Failed to send write request: {}", err));
}
}
}
Err(err) => {
return Err(anyhow!("Failed to generate WRITE message: {}", err));
}
}
}
pub async fn listen(&self) -> Result<(), anyhow::Error> {
let mut buf = [0; 1024];
loop {
let (len, _) = self.socket.recv_from(&mut buf).await?;
let msg_str = String::from_utf8(buf[..len].to_vec())?;
let message: CoreLinkMessage = serde_json::from_str(&msg_str)?;
log::info!("Received message: {:?}", message);
}
}
}