use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::{io::AsyncWriteExt, net::UnixStream};
use crate::{ErrorKind, NmstateError};
const BUFFER_SIZE: usize = 4096;
const MAX_RECV_RETRY_COUNT: usize = 1048576;
#[derive(Debug)]
pub(crate) struct OvsDbJsonRpc {
socket: UnixStream,
}
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
struct OvsDbRpcError {
error: String,
details: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
pub(crate) struct OvsDbRpcReply {
result: Value,
error: Option<OvsDbRpcError>,
id: u64,
}
impl OvsDbJsonRpc {
pub(crate) async fn connect(
socket_path: &str,
) -> Result<Self, NmstateError> {
Ok(Self {
socket: UnixStream::connect(socket_path).await.map_err(|e| {
NmstateError::new(ErrorKind::Bug, format!("socket error {e}"))
})?,
})
}
pub(crate) async fn send(
&mut self,
data: &Value,
) -> Result<(), NmstateError> {
let buffer = serde_json::to_string(&data)?;
log::debug!("OVSDB: sending command {buffer}");
self.socket
.write_all(buffer.as_bytes())
.await
.map_err(|e| {
NmstateError::new(
ErrorKind::PluginFailure,
format!("Failed to send message to OVSDB: {e}"),
)
})?;
self.socket.flush().await.map_err(|e| {
NmstateError::new(
ErrorKind::PluginFailure,
format!(
"Failed to flush buffer when sending message to OVSDB: {e}"
),
)
})?;
Ok(())
}
pub(crate) async fn recv(
&mut self,
transaction_id: u64,
) -> Result<Value, NmstateError> {
let mut response: Vec<u8> = Vec::with_capacity(BUFFER_SIZE);
let mut reply: Result<OvsDbRpcReply, NmstateError> =
Err(NmstateError::new(
ErrorKind::PluginFailure,
"Empty reply from OVSDB".to_string(),
));
for _ in 0..MAX_RECV_RETRY_COUNT {
self.socket.readable().await.map_err(|e| {
NmstateError::new(
ErrorKind::PluginFailure,
format!("OVSDB connection is not readable: {e}"),
)
})?;
let mut buffer = [0; BUFFER_SIZE];
let read_size = match self.socket.try_read(&mut buffer) {
Ok(s) => s,
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(NmstateError::new(
ErrorKind::PluginFailure,
format!(
"Failed to read data from OVSDB connection: {e}"
),
));
}
};
log::debug!(
"OVSDB: recv size {read_size}, data {:?}",
&buffer[..read_size]
);
if read_size > 0 {
response.extend_from_slice(&buffer[..read_size]);
}
match String::from_utf8(response.clone()) {
Ok(reply_str) => {
log::debug!("OVSDB: recv string {:?}", &reply_str);
match serde_json::from_str::<OvsDbRpcReply>(&reply_str) {
Ok(r) => {
reply = Ok(r);
break;
}
Err(e) => {
reply = Err(NmstateError::new(
ErrorKind::PluginFailure,
format!(
"OVS db reply is not valid OvsDbRpcReply: \
{e}"
),
));
}
}
}
Err(e) => {
reply = Err(NmstateError::new(
ErrorKind::PluginFailure,
format!("OVS db reply is not valid UTF8 string: {e}"),
));
}
}
}
let reply = reply?;
if reply.id != transaction_id {
let e = NmstateError::new(
ErrorKind::PluginFailure,
format!(
"Transaction ID mismatch for OVS DB JSON RPC: {reply:?}"
),
);
log::error!("{e}");
Err(e)
} else if let Some(rpc_error) = reply.error {
let e = NmstateError::new(
ErrorKind::PluginFailure,
format!("OVS DB JSON RPC error: {rpc_error:?}"),
);
log::error!("{e}");
Err(e)
} else {
Ok(reply.result)
}
}
}