1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
#![allow(missing_docs)]
use minicbor::{Decode, Encode};
use crate::{Context, MessageSendReceiveOptions};
use ockam_core::api::Reply::Successful;
use ockam_core::api::{Error, Reply, Request, Response};
use ockam_core::compat::time::Duration;
use ockam_core::compat::vec::Vec;
use ockam_core::{LocalInfo, Result, Route};
/// This struct provides some support for making requests to another node
/// and receiving replies
pub struct Client {
route: Route,
timeout: Option<Duration>,
}
impl Client {
/// Create a new client to send messages to a given destination
/// A default timeout can be specified
/// WARNING: The caller is responsible for cleaning all the resources
/// involved in the Route when it's no longer used (like TCP connections or Secure Channels)
pub fn new(route: &Route, timeout: Option<Duration>) -> Self {
Self {
route: route.clone(),
timeout,
}
}
/// Send a request of type T and receive a reply of type R
///
/// The result is a `Result<Reply<R>>` where `Reply<R>` can contain a value of type `R` but
/// might be an error and a status code if the request was not successful.
///
/// This allows to distinguish:
///
/// - communication errors
/// - request failures
/// - successes
///
/// Note that a `Reply<T>` can be converted in a `Result<T>` by using the `success()?` method
/// if one is not interested in request failures.
pub async fn ask<T, R>(&self, ctx: &Context, req: Request<T>) -> Result<Reply<R>>
where
T: Encode<()>,
R: for<'a> Decode<'a, ()>,
{
let bytes: Vec<u8> = self.request_with_timeout(ctx, req, self.timeout).await?;
Response::parse_response_reply::<R>(bytes.as_slice())
}
/// Send a request of type T and don't expect a reply
/// See `ask` for more information
pub async fn tell<T>(&self, ctx: &Context, req: Request<T>) -> Result<Reply<()>>
where
T: Encode<()>,
{
let request_header = req.header().clone();
let bytes = self.request_with_timeout(ctx, req, self.timeout).await?;
let (response, decoder) = Response::parse_response_header(bytes.as_slice())?;
if !response.is_ok() {
Ok(Reply::Failed(
Error::from_failed_request(&request_header, &response.parse_err_msg(decoder)),
response.status(),
))
} else {
Ok(Successful(()))
}
}
/// Send a request of type T and expect an untyped reply
/// See `ask` for more information
pub async fn request<T>(&self, ctx: &Context, req: Request<T>) -> Result<Vec<u8>>
where
T: Encode<()>,
{
self.request_with_timeout(ctx, req, self.timeout).await
}
/// Send a request of type T and expect an untyped reply within a specific timeout
/// See `ask` for more information
pub async fn request_with_timeout<T>(
&self,
ctx: &Context,
req: Request<T>,
timeout: Option<Duration>,
) -> Result<Vec<u8>>
where
T: Encode<()>,
{
let (response, _) = self.request_with_local_info(ctx, req, timeout).await?;
Ok(response)
}
/// Send a request of type T and expect an untyped reply within a specific timeout
/// Additionally provide any local information added to the received message
/// See `ask` for more information
pub async fn ask_with_local_info<T, R>(
&self,
ctx: &Context,
req: Request<T>,
timeout: Option<Duration>,
) -> Result<(Reply<R>, Vec<LocalInfo>)>
where
T: Encode<()>,
R: for<'a> Decode<'a, ()>,
{
let (bytes, local_info) = self.request_with_local_info(ctx, req, timeout).await?;
let reply = Response::parse_response_reply::<R>(bytes.as_slice())?;
Ok((reply, local_info))
}
/// Send a request of type T and expect an untyped reply within a specific timeout
/// Additionally provide any local information added to the received message
/// See `ask` for more information
async fn request_with_local_info<T>(
&self,
ctx: &Context,
req: Request<T>,
timeout: Option<Duration>,
) -> Result<(Vec<u8>, Vec<LocalInfo>)>
where
T: Encode<()>,
{
let mut buf = Vec::new();
req.encode(&mut buf)?;
trace! {
target: "ockam_api",
id = %req.header().id(),
method = ?req.header().method(),
path = %req.header().path(),
body = %req.header().has_body(),
};
let options = if let Some(t) = timeout {
MessageSendReceiveOptions::new().with_timeout(t)
} else {
MessageSendReceiveOptions::new().without_timeout()
};
// TODO: Check IdentityId is the same we sent message to?
// TODO: Check response id matches request id?
let resp = ctx
.send_and_receive_extended::<Vec<u8>>(self.route.clone(), buf, options)
.await?;
let local_info = resp.local_message().local_info().to_vec();
let body = resp.body();
Ok((body, local_info))
}
}