use async_trait::async_trait;
use iroh::{endpoint::presets, Endpoint, EndpointAddr, EndpointId, SecretKey};
use tracing::debug;
use crate::endpoint::{MaEndpoint, DEFAULT_INBOX_CAPACITY};
use crate::error::{Error, Result};
use crate::inbox::Inbox;
use crate::iroh::channel::Channel;
use crate::outbox::Outbox;
use crate::resolve::DidResolver;
use did_ma::Message;
use crate::transport::resolve_endpoint_for_protocol;
pub struct IrohEndpoint {
endpoint: Endpoint,
protocols: Vec<String>,
}
impl IrohEndpoint {
pub async fn new(secret_bytes: [u8; 32]) -> Result<Self> {
let secret = SecretKey::from_bytes(&secret_bytes);
let endpoint = Endpoint::builder(presets::N0)
.secret_key(secret)
.bind()
.await
.map_err(|e| Error::Transport(format!("endpoint bind failed: {e}")))?;
let _ = endpoint.online().await;
debug!(
endpoint_id = %endpoint.id(),
"iroh endpoint online"
);
Ok(Self {
endpoint,
protocols: Vec::new(),
})
}
pub fn inner(&self) -> &Endpoint {
&self.endpoint
}
pub fn into_inner(self) -> Endpoint {
self.endpoint
}
pub fn endpoint_id(&self) -> EndpointId {
self.endpoint.id()
}
pub async fn open(&self, target: &str, protocol: &str) -> Result<Channel> {
let addr = self.resolve_addr(target)?;
let connection = self
.endpoint
.connect(addr, protocol.as_bytes())
.await
.map_err(|e| Error::Transport(format!("connect failed: {e}")))?;
let (send, _recv) = connection
.open_bi()
.await
.map_err(|e| Error::Transport(format!("open_bi failed: {e}")))?;
Ok(Channel::new(connection, send))
}
fn resolve_addr(&self, target: &str) -> Result<EndpointAddr> {
let target_id: EndpointId = target
.trim()
.parse()
.map_err(|e| Error::Transport(format!("invalid endpoint id: {e}")))?;
Ok(EndpointAddr::new(target_id))
}
pub async fn outbox(
&self,
resolver: &dyn DidResolver,
did: &str,
protocol: &str,
) -> Result<Outbox> {
let doc = resolver.resolve(did).await?;
let services = doc
.ma
.as_ref()
.and_then(|ma| ma.get("services").ok().flatten())
.and_then(|services| serde_json::to_value(services).ok());
let endpoint_id = resolve_endpoint_for_protocol(services.as_ref(), protocol).ok_or_else(|| {
Error::NoInboxTransport(format!("{} has no service for {}", did, protocol,))
})?;
let channel = self.open(&endpoint_id, protocol).await?;
Ok(Outbox::from_channel(
channel,
did.to_string(),
protocol.to_string(),
))
}
pub async fn close(self) {
self.endpoint.close().await;
}
}
#[async_trait]
impl MaEndpoint for IrohEndpoint {
fn id(&self) -> String {
self.endpoint.id().to_string()
}
fn service(&mut self, protocol: &str) -> Inbox<Message> {
if !self.protocols.contains(&protocol.to_string()) {
self.protocols.push(protocol.to_string());
}
Inbox::new(DEFAULT_INBOX_CAPACITY)
}
fn services(&self) -> Vec<String> {
let id = self.endpoint.id();
self.protocols
.iter()
.map(|proto| format!("/iroh/{}{}", id, proto))
.collect()
}
async fn send_to(&self, target: &str, protocol: &str, message: &Message) -> Result<()> {
message.headers().validate()?;
let cbor = message.to_cbor()?;
let mut channel = self.open(target, protocol).await?;
channel.send(&cbor).await?;
channel.close();
Ok(())
}
}