use std::net::{SocketAddrV4, SocketAddrV6};
use miette::miette;
use ockam::TcpTransport;
use ockam_core::errcode::{Kind, Origin};
use ockam_core::flow_control::FlowControlId;
use ockam_core::{Address, Error, Result, Route, TransportType, LOCAL};
use ockam_multiaddr::proto::{
DnsAddr, Ip4, Ip6, Node, Project, Secure, Service, Space, Tcp, Worker,
};
use ockam_multiaddr::{Code, MultiAddr, Protocol};
use ockam_transport_tcp::{TcpConnection, TcpConnectionOptions, TCP};
use crate::error::ApiError;
pub fn local_multiaddr_to_route(ma: &MultiAddr) -> Result<Route> {
let mut rb = Route::new();
for p in ma.iter() {
match p.code() {
Worker::CODE => {
let local = p.cast::<Worker>().ok_or(Error::new(
Origin::Api,
Kind::Invalid,
format!("incorrect worker address {ma})",),
))?;
rb = rb.append(Address::new(LOCAL, &*local))
}
Service::CODE => {
let local = p.cast::<Service>().ok_or(Error::new(
Origin::Api,
Kind::Invalid,
format!("incorrect service address {ma})",),
))?;
rb = rb.append(Address::new(LOCAL, &*local))
}
Secure::CODE => {
let local = p.cast::<Secure>().ok_or(Error::new(
Origin::Api,
Kind::Invalid,
format!("incorrect secure address {ma})",),
))?;
rb = rb.append(Address::new(LOCAL, &*local))
}
Node::CODE => {
return Err(Error::new(
Origin::Api,
Kind::Invalid,
"unexpected code: node. clean_multiaddr should have been called",
));
}
code @ (Ip4::CODE | Ip6::CODE | DnsAddr::CODE) => {
return Err(Error::new(
Origin::Api,
Kind::Invalid,
format!("unexpected code: {code}. The address must be a local address {ma}"),
));
}
other => {
error!(target: "ockam_api", code = %other, "unsupported protocol");
return Err(Error::new(
Origin::Api,
Kind::Invalid,
format!("unsupported protocol {other}"),
));
}
}
}
Ok(rb.into())
}
pub struct MultiAddrToRouteResult {
pub flow_control_id: Option<FlowControlId>,
pub route: Route,
pub tcp_connection: Option<TcpConnection>,
}
pub async fn multiaddr_to_route(
ma: &MultiAddr,
tcp: &TcpTransport,
) -> Option<MultiAddrToRouteResult> {
let mut rb = Route::new();
let mut it = ma.iter().peekable();
let mut flow_control_id = None;
let mut number_of_tcp_hops = 0;
let mut tcp_connection = None;
while let Some(p) = it.next() {
match p.code() {
Ip4::CODE => {
if number_of_tcp_hops >= 1 {
return None; }
let ip4 = p.cast::<Ip4>()?;
let port = it.next()?.cast::<Tcp>()?;
let socket_addr = SocketAddrV4::new(*ip4, *port);
let options = TcpConnectionOptions::new();
flow_control_id = Some(options.flow_control_id().clone());
let connection = match tcp.connect(socket_addr.to_string(), options).await {
Ok(c) => c,
Err(error) => {
error!(%error, %socket_addr, "Couldn't connect to Ip4 address");
return None;
}
};
number_of_tcp_hops += 1;
rb = rb.append(connection.sender_address().clone());
tcp_connection = Some(connection);
}
Ip6::CODE => {
if number_of_tcp_hops >= 1 {
return None; }
let ip6 = p.cast::<Ip6>()?;
let port = it.next()?.cast::<Tcp>()?;
let socket_addr = SocketAddrV6::new(*ip6, *port, 0, 0);
let options = TcpConnectionOptions::new();
flow_control_id = Some(options.flow_control_id().clone());
let connection = match tcp.connect(socket_addr.to_string(), options).await {
Ok(c) => c,
Err(error) => {
error!(%error, %socket_addr, "Couldn't connect to Ip6 address");
return None;
}
};
number_of_tcp_hops += 1;
rb = rb.append(connection.sender_address().clone());
tcp_connection = Some(connection);
}
DnsAddr::CODE => {
if number_of_tcp_hops >= 1 {
return None; }
let host = p.cast::<DnsAddr>()?;
if let Some(p) = it.peek() {
if p.code() == Tcp::CODE {
let port = p.cast::<Tcp>()?;
let options = TcpConnectionOptions::new();
flow_control_id = Some(options.flow_control_id().clone());
let peer = format!("{}:{}", &*host, *port);
let connection = match tcp.connect(&peer, options).await {
Ok(c) => c,
Err(error) => {
error!(%error, %peer, "Couldn't connect to DNS address");
return None;
}
};
number_of_tcp_hops += 1;
rb = rb.append(connection.sender_address().clone());
tcp_connection = Some(connection);
let _ = it.next();
continue;
}
}
}
Worker::CODE => {
let local = p.cast::<Worker>()?;
rb = rb.append(Address::new(LOCAL, &*local))
}
Service::CODE => {
let local = p.cast::<Service>()?;
rb = rb.append(Address::new(LOCAL, &*local))
}
Secure::CODE => {
let local = p.cast::<Secure>()?;
rb = rb.append(Address::new(LOCAL, &*local))
}
other => {
error!(target: "ockam_api", code = %other, "unsupported protocol");
return None;
}
}
}
Some(MultiAddrToRouteResult {
flow_control_id,
tcp_connection,
route: rb.into(),
})
}
pub fn multiaddr_to_transport_route(ma: &MultiAddr) -> Option<Route> {
let mut route = Route::new();
let mut it = ma.iter().peekable();
while let Some(p) = it.next() {
match p.code() {
Ip4::CODE => {
let ip4 = p.cast::<Ip4>()?;
let port = it.next()?.cast::<Tcp>()?;
let socket_addr = SocketAddrV4::new(*ip4, *port);
route = route.append(Address::new(TCP, socket_addr.to_string()))
}
Ip6::CODE => {
let ip6 = p.cast::<Ip6>()?;
let port = it.next()?.cast::<Tcp>()?;
let socket_addr = SocketAddrV6::new(*ip6, *port, 0, 0);
route = route.append(Address::new(TransportType::new(1), socket_addr.to_string()))
}
DnsAddr::CODE => {
let host = p.cast::<DnsAddr>()?;
if let Some(p) = it.peek() {
if p.code() == Tcp::CODE {
let port = p.cast::<Tcp>()?;
let addr = format!("{}:{}", &*host, *port);
route = route.append(Address::new(TransportType::new(1), addr));
let _ = it.next();
continue;
}
}
}
Worker::CODE => {
let local = p.cast::<Worker>()?;
route = route.append(Address::new(LOCAL, &*local))
}
Service::CODE => {
let local = p.cast::<Service>()?;
route = route.append(Address::new(LOCAL, &*local))
}
Secure::CODE => {
let local = p.cast::<Secure>()?;
route = route.append(Address::new(LOCAL, &*local))
}
other => {
error!(target: "ockam_api", code = %other, "unsupported protocol");
return None;
}
}
}
Some(route.into())
}
pub fn multiaddr_to_addr(ma: &MultiAddr) -> Option<Address> {
let mut it = ma.iter().peekable();
let p = it.next()?;
match p.code() {
Worker::CODE => {
let local = p.cast::<Worker>()?;
Some(Address::new(LOCAL, &*local))
}
Service::CODE => {
let local = p.cast::<Service>()?;
Some(Address::new(LOCAL, &*local))
}
_ => None,
}
}
pub fn try_multiaddr_to_addr(ma: &MultiAddr) -> Result<Address, Error> {
multiaddr_to_addr(ma)
.ok_or_else(|| ApiError::core(format!("could not convert {ma} to address")))
}
pub fn route_to_multiaddr(r: &Route) -> Option<MultiAddr> {
let mut ma = MultiAddr::default();
for a in r.iter() {
ma.try_extend(&try_address_to_multiaddr(a).ok()?).ok()?
}
Some(ma)
}
pub fn try_address_to_multiaddr(a: &Address) -> Result<MultiAddr, Error> {
let mut ma = MultiAddr::default();
match a.transport_type() {
LOCAL => ma.push_back(Service::new(a.address()))?,
other => {
error!(target: "ockam_api", transport = %other, "unsupported transport type");
return Err(ApiError::core(format!("unknown transport type: {other}")));
}
}
Ok(ma)
}
pub fn addr_to_multiaddr<T: Into<Address>>(a: T) -> Option<MultiAddr> {
let r: Route = Route::from(a);
route_to_multiaddr(&r)
}
pub fn is_local_node(ma: &MultiAddr) -> miette::Result<bool> {
let at_rust_node;
if let Some(p) = ma.iter().next() {
match p.code() {
Project::CODE => {
at_rust_node = false;
}
Node::CODE => {
at_rust_node = true;
}
DnsAddr::CODE => {
at_rust_node = p
.cast::<DnsAddr>()
.map(|dnsaddr| (*dnsaddr).eq("localhost"))
.ok_or_else(|| miette!("Invalid \"dnsaddr\" value"))?;
}
Ip4::CODE => {
at_rust_node = p
.cast::<Ip4>()
.map(|ip4| ip4.is_loopback())
.ok_or_else(|| miette!("Invalid \"ip4\" value"))?;
}
Ip6::CODE => {
at_rust_node = p
.cast::<Ip6>()
.map(|ip6| ip6.is_loopback())
.ok_or_else(|| miette!("Invalid \"ip6\" value"))?;
}
_ => {
return Err(miette!("Invalid address, protocol not supported"));
}
}
Ok(at_rust_node)
} else {
Err(miette!("Invalid address"))
}
}
pub fn local_worker(code: &Code) -> Result<bool> {
match *code {
Node::CODE
| Space::CODE
| Project::CODE
| DnsAddr::CODE
| Ip4::CODE
| Ip6::CODE
| Tcp::CODE
| Secure::CODE => Ok(false),
Worker::CODE | Service::CODE => Ok(true),
_ => Err(ApiError::core(format!("unknown transport type: {code}"))),
}
}
#[cfg(test)]
pub mod test_utils {
use ockam::identity::utils::AttributesBuilder;
use ockam::identity::MAX_CREDENTIAL_VALIDITY;
use ockam::identity::{SecureChannels, PROJECT_MEMBER_SCHEMA, TRUST_CONTEXT_ID};
use ockam::Result;
use ockam_core::compat::sync::Arc;
use ockam_core::flow_control::FlowControls;
use ockam_core::AsyncTryClone;
use ockam_node::Context;
use ockam_transport_tcp::TcpTransport;
use crate::cli_state::{random_name, CliState};
use crate::nodes::service::{
NodeManagerGeneralOptions, NodeManagerTransportOptions, NodeManagerTrustOptions,
};
use crate::nodes::InMemoryNode;
use crate::nodes::{NodeManagerWorker, NODEMANAGER_ADDR};
pub struct NodeManagerHandle {
pub cli_state: CliState,
pub node_manager: Arc<InMemoryNode>,
pub tcp: TcpTransport,
pub secure_channels: Arc<SecureChannels>,
}
impl Drop for NodeManagerHandle {
fn drop(&mut self) {
self.cli_state.delete().expect("cannot delete cli state");
}
}
pub async fn start_manager_for_tests(context: &mut Context) -> Result<NodeManagerHandle> {
let tcp = TcpTransport::create(context).await?;
let cli_state = CliState::test().await?;
let node_name = random_name();
cli_state.create_node(&node_name).await.unwrap();
let identifier = cli_state.get_node(&node_name).await?.identifier();
let identity = cli_state.get_identity(&identifier).await?;
let attributes = AttributesBuilder::with_schema(PROJECT_MEMBER_SCHEMA)
.with_attribute(TRUST_CONTEXT_ID.to_vec(), b"test_trust_context_id".to_vec())
.build();
let vault = cli_state
.get_or_create_default_named_vault()
.await?
.vault()
.await?;
let identities = cli_state.make_identities(vault).await?;
let credential = identities
.credentials()
.credentials_creation()
.issue_credential(
&identifier,
&identifier,
attributes,
MAX_CREDENTIAL_VALIDITY,
)
.await
.unwrap();
cli_state
.store_credential("credential", &identity, credential)
.await?;
let trust_context = cli_state
.create_trust_context(
Some("trust-context".to_string()),
None,
Some("credential".to_string()),
None,
None,
)
.await?;
let node_manager = InMemoryNode::new(
context,
NodeManagerGeneralOptions::new(cli_state.clone(), node_name, None, true, false),
NodeManagerTransportOptions::new(
FlowControls::generate_flow_control_id(), tcp.async_try_clone().await?,
),
NodeManagerTrustOptions::new(Some(trust_context)),
)
.await?;
let node_manager = Arc::new(node_manager);
let node_manager_worker = NodeManagerWorker::new(node_manager.clone());
context
.start_worker(NODEMANAGER_ADDR, node_manager_worker)
.await?;
let secure_channels = node_manager.secure_channels();
Ok(NodeManagerHandle {
cli_state,
node_manager,
tcp: tcp.async_try_clone().await?,
secure_channels,
})
}
}