pub(crate) mod client;
use super::protocol;
use async_trait::async_trait;
use thiserror::Error;
pub use super::{
ClientBuilder, HttpClientBuilder, Response, ResultFormat, UdsClientBuilder, UseHttp, UseHttps,
};
pub use client::{HttpConnector, HttpsConnector};
#[derive(Debug, Error)]
pub enum Error {
#[error("protocol error: {0}")]
ProtocolError(#[from] protocol::Error),
#[error("communication error: {0}")]
ClientError(#[from] client::Error),
}
#[async_trait]
pub trait Runner {
async fn run<S>(self, cmds: &[S], format: ResultFormat) -> Result<Response, Error>
where
S: AsRef<str> + Send + Sync;
}
#[async_trait]
impl<T: client::Requester + Send> Runner for T {
async fn run<S>(self, cmds: &[S], format: ResultFormat) -> Result<Response, Error>
where
S: AsRef<str> + Send + Sync,
{
let request = protocol::make_run_request(cmds, format);
let response = self.do_request(request).await?;
protocol::parse_response(&response).map_err(|e| e.into())
}
}
impl ClientBuilder<UdsClientBuilder> {
pub async fn build_async(self) -> Result<client::UdsClient, Error> {
let socket_name = self
.0
.socket_name
.unwrap_or_else(|| protocol::make_socket_name(super::SYSNAME));
client::UdsClient::connect(self.0.sysname, socket_name)
.await
.map_err(|e| e.into())
}
}
impl ClientBuilder<HttpClientBuilder<UseHttp>> {
pub fn build_async(self) -> client::HttpClient<HttpConnector> {
client::HttpClient::new_http(self.0.hostname, self.0.auth, self.0.timeout)
}
}
impl ClientBuilder<HttpClientBuilder<UseHttps>> {
pub fn build_async(self) -> client::HttpClient<HttpsConnector> {
client::HttpClient::new_https(
self.0.hostname,
self.0.auth,
self.0.timeout,
self.0.https.insecure,
)
}
}
#[deprecated(since = "0.2.0", note = "please use the `ClientBuilder`")]
pub async fn eapi_run<T: AsRef<str> + Send + Sync>(
sysname: Option<&str>,
commands: &[T],
format: ResultFormat,
) -> Result<Response, Error> {
let mut builder = ClientBuilder::unix_socket();
if let Some(sysname) = sysname {
builder = builder.set_sysname(sysname.to_owned());
}
builder.build_async().await?.run(commands, format).await
}
#[cfg(test)]
mod tests {
use super::*;
use std::convert::Infallible;
async fn run_http_server(
response: &str,
) -> (
u16,
tokio::sync::oneshot::Sender<()>,
tokio::sync::mpsc::Receiver<(Vec<u8>, Vec<u8>)>,
) {
let (sender, receiver) = tokio::sync::mpsc::channel(1);
let (tx_shut, rx_shut) = tokio::sync::oneshot::channel::<()>();
let addr = ([127, 0, 0, 1], 0).into();
let incoming = hyper::server::conn::AddrIncoming::bind(&addr).unwrap();
let port = incoming.local_addr().port();
let response = response.to_string();
tokio::spawn(async move {
let make_service = hyper::service::make_service_fn(move |_conn| {
let response = response.clone();
let sender = sender.clone();
async move {
Ok::<_, Infallible>(hyper::service::service_fn(
move |req: hyper::Request<hyper::Body>| {
let auth = req
.headers()
.get("Authorization")
.unwrap_or(&hyper::header::HeaderValue::from_static(""))
.as_bytes()
.to_vec();
let response = response.clone();
let sender = sender.clone();
async move {
let body = hyper::body::to_bytes(req.into_body()).await?.to_vec();
sender.send((auth, body)).await.unwrap();
Ok::<_, hyper::Error>(hyper::Response::new(hyper::Body::from(
response,
)))
}
},
))
}
});
let server = hyper::server::Server::builder(incoming)
.serve(make_service)
.with_graceful_shutdown(async {
rx_shut.await.ok();
});
server.await.unwrap();
});
(port, tx_shut, receiver)
}
#[tokio::test]
async fn test_uds_ok() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let tmp_dir = tempfile::tempdir()?;
let socket_name = tmp_dir
.path()
.join(crate::SYSNAME)
.to_str()
.ok_or("can't convert path to string")?
.to_string();
let response = r#"{
"jsonrpc": "2.0",
"result": ["test1", "test2", {"a": "b"}],
"id": "1"
}"#;
let sname = socket_name.clone();
let handle = tokio::task::spawn_blocking(move || {
let (ready, handle) = crate::tests::run_uds_server(sname, crate::SYSNAME, response);
ready.wait();
handle
})
.await?;
let result = ClientBuilder::unix_socket()
.set_sysname(crate::SYSNAME.to_owned())
.set_socket_name(socket_name)
.build_async()
.await?
.run(&["show run", "show int", "show clock"], ResultFormat::Json)
.await?;
let request = tokio::task::spawn_blocking(|| match handle.join() {
Ok(r) => r,
Err(e) => std::panic::resume_unwind(e),
})
.await??;
let expected = serde_json::json!({
"jsonrpc": "2.0",
"method": "runCmds",
"params": {
"version": "latest",
"cmds": ["show run", "show int", "show clock"],
"format": "json",
},
"id": "1"
})
.to_string();
assert_eq!(request, expected.as_bytes());
assert_eq!(
result,
Response::Result(vec![
"\"test1\"".to_string(),
"\"test2\"".to_string(),
"{\"a\":\"b\"}".to_string()
])
);
Ok(())
}
#[tokio::test]
async fn test_uds_error() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let tmp_dir = tempfile::tempdir()?;
let socket_name = tmp_dir
.path()
.join(crate::SYSNAME)
.to_str()
.ok_or("can't convert path to string")?
.to_string();
let response = r#"{
"jsonrpc": "2.0",
"error": {
"message": "error message",
"code": 3,
"data": ["a", "b"]
},
"id": "1"
}"#;
let sname = socket_name.clone();
let handle = tokio::task::spawn_blocking(move || {
let (ready, handle) = crate::tests::run_uds_server(sname, crate::SYSNAME, response);
ready.wait();
handle
})
.await?;
let result = ClientBuilder::unix_socket()
.set_sysname(crate::SYSNAME.to_owned())
.set_socket_name(socket_name)
.build_async()
.await?
.run(&["show run", "show int", "show clock"], ResultFormat::Json)
.await?;
let request = tokio::task::spawn_blocking(|| match handle.join() {
Ok(r) => r,
Err(e) => std::panic::resume_unwind(e),
})
.await??;
let expected = serde_json::json!({
"jsonrpc": "2.0",
"method": "runCmds",
"params": {
"version": "latest",
"cmds": ["show run", "show int", "show clock"],
"format": "json",
},
"id": "1"
})
.to_string();
assert_eq!(request, expected.as_bytes());
assert_eq!(
result,
Response::Error {
message: "error message".to_string(),
code: 3,
errors: vec!["\"a\"".to_string(), "\"b\"".to_string()]
}
);
Ok(())
}
#[tokio::test]
async fn test_http_ok() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let response = r#"{
"jsonrpc": "2.0",
"result": ["test1", "test2", {"a": "b"}],
"id": "1"
}"#;
let (port, shutdown, mut receiver) = run_http_server(response).await;
let result = ClientBuilder::http("localhost:".to_owned() + &port.to_string())
.set_authentication("admin".to_owned(), "pass".to_owned())
.build_async()
.run(&["show run", "show int", "show clock"], ResultFormat::Json)
.await?;
let request = receiver.recv().await.unwrap();
let expected = serde_json::json!({
"jsonrpc": "2.0",
"method": "runCmds",
"params": {
"version": "latest",
"cmds": ["show run", "show int", "show clock"],
"format": "json",
},
"id": "1"
})
.to_string();
assert_eq!(request.0, "Basic YWRtaW46cGFzcw==".as_bytes());
assert_eq!(request.1, expected.as_bytes());
assert_eq!(
result,
Response::Result(vec![
"\"test1\"".to_string(),
"\"test2\"".to_string(),
"{\"a\":\"b\"}".to_string()
])
);
let _ = shutdown.send(());
Ok(())
}
#[tokio::test]
async fn test_http_error() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let response = r#"{
"jsonrpc": "2.0",
"error": {
"message": "error message",
"code": 3,
"data": ["a", "b"]
},
"id": "1"
}"#;
let (port, shutdown, mut receiver) = run_http_server(response).await;
let result = ClientBuilder::http("localhost:".to_owned() + &port.to_string())
.set_authentication("admin".to_owned(), "pass".to_owned())
.build_async()
.run(&["show run", "show int", "show clock"], ResultFormat::Json)
.await?;
let request = receiver.recv().await.unwrap();
let expected = serde_json::json!({
"jsonrpc": "2.0",
"method": "runCmds",
"params": {
"version": "latest",
"cmds": ["show run", "show int", "show clock"],
"format": "json",
},
"id": "1"
})
.to_string();
assert_eq!(request.0, "Basic YWRtaW46cGFzcw==".as_bytes());
assert_eq!(request.1, expected.as_bytes());
assert_eq!(
result,
Response::Error {
message: "error message".to_string(),
code: 3,
errors: vec!["\"a\"".to_string(), "\"b\"".to_string()]
}
);
let _ = shutdown.send(());
Ok(())
}
}