pub mod error;
pub const THIN_CLIENT_SUCCESS: u8 = 0;
pub const THIN_CLIENT_ERROR_CONNECTION_LOST: u8 = 1;
pub const THIN_CLIENT_ERROR_TIMEOUT: u8 = 2;
pub const THIN_CLIENT_ERROR_INVALID_REQUEST: u8 = 3;
pub const THIN_CLIENT_ERROR_INTERNAL_ERROR: u8 = 4;
pub const THIN_CLIENT_ERROR_MAX_RETRIES: u8 = 5;
pub const THIN_CLIENT_ERROR_INVALID_CHANNEL: u8 = 6;
pub const THIN_CLIENT_ERROR_CHANNEL_NOT_FOUND: u8 = 7;
pub const THIN_CLIENT_ERROR_PERMISSION_DENIED: u8 = 8;
pub const THIN_CLIENT_ERROR_INVALID_PAYLOAD: u8 = 9;
pub const THIN_CLIENT_ERROR_SERVICE_UNAVAILABLE: u8 = 10;
pub const THIN_CLIENT_ERROR_DUPLICATE_CAPABILITY: u8 = 11;
pub const THIN_CLIENT_ERROR_COURIER_CACHE_CORRUPTION: u8 = 12;
pub const THIN_CLIENT_PROPAGATION_ERROR: u8 = 13;
pub fn thin_client_error_to_string(error_code: u8) -> &'static str {
match error_code {
THIN_CLIENT_SUCCESS => "Success",
THIN_CLIENT_ERROR_CONNECTION_LOST => "Connection lost",
THIN_CLIENT_ERROR_TIMEOUT => "Timeout",
THIN_CLIENT_ERROR_INVALID_REQUEST => "Invalid request",
THIN_CLIENT_ERROR_INTERNAL_ERROR => "Internal error",
THIN_CLIENT_ERROR_MAX_RETRIES => "Maximum retries exceeded",
THIN_CLIENT_ERROR_INVALID_CHANNEL => "Invalid channel",
THIN_CLIENT_ERROR_CHANNEL_NOT_FOUND => "Channel not found",
THIN_CLIENT_ERROR_PERMISSION_DENIED => "Permission denied",
THIN_CLIENT_ERROR_INVALID_PAYLOAD => "Invalid payload",
THIN_CLIENT_ERROR_SERVICE_UNAVAILABLE => "Service unavailable",
THIN_CLIENT_ERROR_DUPLICATE_CAPABILITY => "Duplicate capability",
THIN_CLIENT_ERROR_COURIER_CACHE_CORRUPTION => "Courier cache corruption",
THIN_CLIENT_PROPAGATION_ERROR => "Propagation error",
_ => "Unknown thin client error code",
}
}
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
use std::fs;
use serde::Deserialize;
use serde_json::json;
use serde_cbor::{from_slice, Value};
use tokio::sync::{Mutex, RwLock, mpsc};
use tokio::task::JoinHandle;
use tokio::net::{TcpStream, UnixStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::tcp::{OwnedReadHalf as TcpReadHalf, OwnedWriteHalf as TcpWriteHalf};
use tokio::net::unix::{OwnedReadHalf as UnixReadHalf, OwnedWriteHalf as UnixWriteHalf};
use blake2::{Blake2b, Digest};
use generic_array::typenum::U32;
use rand::RngCore;
use log::{debug, error};
use crate::error::ThinClientError;
#[derive(Debug, Clone)]
pub struct WriteChannelReply {
pub send_message_payload: Vec<u8>,
pub current_message_index: Vec<u8>,
pub next_message_index: Vec<u8>,
pub envelope_descriptor: Vec<u8>,
pub envelope_hash: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct ReadChannelReply {
pub send_message_payload: Vec<u8>,
pub current_message_index: Vec<u8>,
pub next_message_index: Vec<u8>,
pub reply_index: Option<u8>,
pub envelope_descriptor: Vec<u8>,
pub envelope_hash: Vec<u8>,
}
const SURB_ID_SIZE: usize = 16;
const MESSAGE_ID_SIZE: usize = 16;
const QUERY_ID_SIZE: usize = 16;
#[derive(Debug, Clone)]
pub struct ServiceDescriptor {
pub recipient_queue_id: Vec<u8>,
pub mix_descriptor: BTreeMap<Value, Value>,
}
impl ServiceDescriptor {
pub fn to_destination(&self) -> (Vec<u8>, Vec<u8>) {
let identity_key = self
.mix_descriptor
.get(&Value::Text("IdentityKey".to_string()))
.and_then(|v| match v {
Value::Bytes(b) => Some(b),
_ => None,
})
.cloned()
.unwrap_or_else(Vec::new);
let mut hasher = Blake2b::<U32>::new();
hasher.update(&identity_key);
let provider_id_hash = hasher.finalize().to_vec();
(provider_id_hash, self.recipient_queue_id.clone())
}
}
#[derive(Debug, Deserialize, Clone)]
pub struct Geometry {
#[serde(rename = "PacketLength")]
pub packet_length: usize,
#[serde(rename = "NrHops")]
pub nr_hops: usize,
#[serde(rename = "HeaderLength")]
pub header_length: usize,
#[serde(rename = "RoutingInfoLength")]
pub routing_info_length: usize,
#[serde(rename = "PerHopRoutingInfoLength")]
pub per_hop_routing_info_length: usize,
#[serde(rename = "SURBLength")]
pub surb_length: usize,
#[serde(rename = "SphinxPlaintextHeaderLength")]
pub sphinx_plaintext_header_length: usize,
#[serde(rename = "PayloadTagLength")]
pub payload_tag_length: usize,
#[serde(rename = "ForwardPayloadLength")]
pub forward_payload_length: usize,
#[serde(rename = "UserForwardPayloadLength")]
pub user_forward_payload_length: usize,
#[serde(rename = "NextNodeHopLength")]
pub next_node_hop_length: usize,
#[serde(rename = "SPRPKeyMaterialLength")]
pub sprp_key_material_length: usize,
#[serde(rename = "NIKEName")]
pub nike_name: String,
#[serde(rename = "KEMName")]
pub kem_name: String,
}
#[derive(Debug, Deserialize)]
pub struct ConfigFile {
#[serde(rename = "SphinxGeometry")]
pub sphinx_geometry: Geometry,
#[serde(rename = "Network")]
pub network: String,
#[serde(rename = "Address")]
pub address: String,
}
impl ConfigFile {
pub fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
let contents = fs::read_to_string(path)?;
let config: ConfigFile = toml::from_str(&contents)?;
Ok(config)
}
}
#[derive(Clone)]
pub struct Config {
pub network: String,
pub address: String,
pub sphinx_geometry: Geometry,
pub on_connection_status: Option<Arc<dyn Fn(&BTreeMap<Value, Value>) + Send + Sync>>,
pub on_new_pki_document: Option<Arc<dyn Fn(&BTreeMap<Value, Value>) + Send + Sync>>,
pub on_message_sent: Option<Arc<dyn Fn(&BTreeMap<Value, Value>) + Send + Sync>>,
pub on_message_reply: Option<Arc<dyn Fn(&BTreeMap<Value, Value>) + Send + Sync>>,
}
impl Config {
pub fn new(filepath: &str) -> Result<Self, Box<dyn std::error::Error>> {
let contents = fs::read_to_string(filepath)?;
let parsed: ConfigFile = toml::from_str(&contents)?;
Ok(Self {
network: parsed.network,
address: parsed.address,
sphinx_geometry: parsed.sphinx_geometry,
on_connection_status: None,
on_new_pki_document: None,
on_message_sent: None,
on_message_reply: None,
})
}
}
pub enum ReadHalf {
Tcp(TcpReadHalf),
Unix(UnixReadHalf),
}
pub enum WriteHalf {
Tcp(TcpWriteHalf),
Unix(UnixWriteHalf),
}
pub struct EventSinkReceiver {
receiver: mpsc::UnboundedReceiver<BTreeMap<Value, Value>>,
sender: mpsc::UnboundedSender<BTreeMap<Value, Value>>,
drain_remove: mpsc::UnboundedSender<mpsc::UnboundedSender<BTreeMap<Value, Value>>>,
}
impl EventSinkReceiver {
pub async fn recv(&mut self) -> Option<BTreeMap<Value, Value>> {
self.receiver.recv().await
}
}
impl Drop for EventSinkReceiver {
fn drop(&mut self) {
if let Err(_) = self.drain_remove.send(self.sender.clone()) {
debug!("Failed to remove drain channel - event sink worker may be stopped");
}
}
}
pub struct ThinClient {
read_half: Mutex<ReadHalf>,
write_half: Mutex<WriteHalf>,
config: Config,
pki_doc: Arc<RwLock<Option<BTreeMap<Value, Value>>>>,
worker_task: Mutex<Option<JoinHandle<()>>>,
event_sink_task: Mutex<Option<JoinHandle<()>>>,
shutdown: Arc<AtomicBool>,
is_connected: Arc<AtomicBool>,
event_sink: mpsc::UnboundedSender<BTreeMap<Value, Value>>,
drain_add: mpsc::UnboundedSender<mpsc::UnboundedSender<BTreeMap<Value, Value>>>,
drain_remove: mpsc::UnboundedSender<mpsc::UnboundedSender<BTreeMap<Value, Value>>>,
}
impl ThinClient {
pub async fn new(config: Config) -> Result<Arc<Self>, Box<dyn std::error::Error>> {
let (event_sink_tx, event_sink_rx) = mpsc::unbounded_channel();
let (drain_add_tx, drain_add_rx) = mpsc::unbounded_channel();
let (drain_remove_tx, drain_remove_rx) = mpsc::unbounded_channel();
let client = match config.network.to_uppercase().as_str() {
"TCP" => {
let socket = TcpStream::connect(&config.address).await?;
let (read_half, write_half) = socket.into_split();
Arc::new(Self {
read_half: Mutex::new(ReadHalf::Tcp(read_half)),
write_half: Mutex::new(WriteHalf::Tcp(write_half)),
config,
pki_doc: Arc::new(RwLock::new(None)),
worker_task: Mutex::new(None),
event_sink_task: Mutex::new(None),
shutdown: Arc::new(AtomicBool::new(false)),
is_connected: Arc::new(AtomicBool::new(false)),
event_sink: event_sink_tx.clone(),
drain_add: drain_add_tx.clone(),
drain_remove: drain_remove_tx.clone(),
})
}
"UNIX" => {
let path = if config.address.starts_with('@') {
let mut p = String::from("\0");
p.push_str(&config.address[1..]);
p
} else {
config.address.clone()
};
let socket = UnixStream::connect(path).await?;
let (read_half, write_half) = socket.into_split();
Arc::new(Self {
read_half: Mutex::new(ReadHalf::Unix(read_half)),
write_half: Mutex::new(WriteHalf::Unix(write_half)),
config,
pki_doc: Arc::new(RwLock::new(None)),
worker_task: Mutex::new(None),
event_sink_task: Mutex::new(None),
shutdown: Arc::new(AtomicBool::new(false)),
is_connected: Arc::new(AtomicBool::new(false)),
event_sink: event_sink_tx,
drain_add: drain_add_tx,
drain_remove: drain_remove_tx,
})
}
_ => {
return Err(format!("Unknown network type: {}", config.network).into());
}
};
let client_clone = Arc::clone(&client);
let task = tokio::spawn(async move { client_clone.worker_loop().await });
*client.worker_task.lock().await = Some(task);
let client_clone2 = Arc::clone(&client);
let event_sink_task = tokio::spawn(async move {
client_clone2.event_sink_worker(event_sink_rx, drain_add_rx, drain_remove_rx).await
});
*client.event_sink_task.lock().await = Some(event_sink_task);
debug!("✅ ThinClient initialized with worker loop and event sink started.");
Ok(client)
}
pub async fn stop(&self) {
debug!("Stopping ThinClient...");
self.shutdown.store(true, Ordering::Relaxed);
let mut write_half = self.write_half.lock().await;
let _ = match &mut *write_half {
WriteHalf::Tcp(wh) => wh.shutdown().await,
WriteHalf::Unix(wh) => wh.shutdown().await,
};
if let Some(worker) = self.worker_task.lock().await.take() {
worker.abort();
}
debug!("✅ ThinClient stopped.");
}
pub fn is_connected(&self) -> bool {
self.is_connected.load(Ordering::Relaxed)
}
pub fn event_sink(&self) -> EventSinkReceiver {
let (tx, rx) = mpsc::unbounded_channel();
if let Err(_) = self.drain_add.send(tx.clone()) {
debug!("Failed to add drain channel - event sink worker may be stopped");
}
EventSinkReceiver {
receiver: rx,
sender: tx,
drain_remove: self.drain_remove.clone(),
}
}
pub fn new_message_id() -> Vec<u8> {
let mut id = vec![0; MESSAGE_ID_SIZE];
rand::thread_rng().fill_bytes(&mut id);
id
}
pub fn new_surb_id() -> Vec<u8> {
let mut id = vec![0; SURB_ID_SIZE];
rand::thread_rng().fill_bytes(&mut id);
id
}
pub fn new_query_id() -> Vec<u8> {
let mut id = vec![0; QUERY_ID_SIZE];
rand::thread_rng().fill_bytes(&mut id);
id
}
async fn update_pki_document(&self, new_pki_doc: BTreeMap<Value, Value>) {
let mut pki_doc_lock = self.pki_doc.write().await;
*pki_doc_lock = Some(new_pki_doc);
debug!("PKI document updated.");
}
pub async fn pki_document(&self) -> BTreeMap<Value, Value> {
self.pki_doc.read().await.clone().expect("❌ PKI document is missing!")
}
pub async fn get_service(&self, service_name: &str) -> Result<ServiceDescriptor, ThinClientError> {
let doc = self.pki_doc.read().await.clone().ok_or(ThinClientError::MissingPkiDocument)?;
let services = find_services(service_name, &doc);
services.into_iter().next().ok_or(ThinClientError::ServiceNotFound)
}
pub async fn get_courier_destination(&self) -> Result<(Vec<u8>, Vec<u8>), ThinClientError> {
let courier_service = self.get_service("courier").await?;
let (dest_node, dest_queue) = courier_service.to_destination();
Ok((dest_node, dest_queue))
}
async fn recv(&self) -> Result<BTreeMap<Value, Value>, ThinClientError> {
let mut length_prefix = [0; 4];
{
let mut read_half = self.read_half.lock().await;
match &mut *read_half {
ReadHalf::Tcp(rh) => rh.read_exact(&mut length_prefix).await.map_err(ThinClientError::IoError)?,
ReadHalf::Unix(rh) => rh.read_exact(&mut length_prefix).await.map_err(ThinClientError::IoError)?,
};
}
let message_length = u32::from_be_bytes(length_prefix) as usize;
let mut buffer = vec![0; message_length];
{
let mut read_half = self.read_half.lock().await;
match &mut *read_half {
ReadHalf::Tcp(rh) => rh.read_exact(&mut buffer).await.map_err(ThinClientError::IoError)?,
ReadHalf::Unix(rh) => rh.read_exact(&mut buffer).await.map_err(ThinClientError::IoError)?,
};
}
let response: BTreeMap<Value, Value> = match from_slice(&buffer) {
Ok(parsed) => {
parsed
}
Err(err) => {
error!("❌ Failed to parse CBOR: {:?}", err);
return Err(ThinClientError::CborError(err));
}
};
Ok(response)
}
fn parse_status(&self, event: &BTreeMap<Value, Value>) {
let is_connected = event.get(&Value::Text("is_connected".to_string()))
.and_then(|v| match v {
Value::Bool(b) => Some(*b),
_ => None,
})
.unwrap_or(false);
self.is_connected.store(is_connected, Ordering::Relaxed);
if is_connected {
debug!("✅ Daemon is connected to mixnet - full functionality available.");
} else {
debug!("📴 Daemon is not connected to mixnet - entering offline mode (channel operations will work).");
}
}
async fn parse_pki_doc(&self, event: &BTreeMap<Value, Value>) {
if let Some(Value::Bytes(payload)) = event.get(&Value::Text("payload".to_string())) {
match serde_cbor::from_slice::<BTreeMap<Value, Value>>(payload) {
Ok(raw_pki_doc) => {
self.update_pki_document(raw_pki_doc).await;
debug!("✅ PKI document successfully parsed.");
}
Err(err) => {
error!("❌ Failed to parse PKI document: {:?}", err);
}
}
} else {
error!("❌ Missing 'payload' field in PKI document event.");
}
}
async fn handle_response(&self, response: BTreeMap<Value, Value>) {
assert!(!response.is_empty(), "❌ Received an empty response!");
if let Some(Value::Map(event)) = response.get(&Value::Text("connection_status_event".to_string())) {
debug!("🔄 Connection status event received.");
self.parse_status(event);
if let Some(cb) = self.config.on_connection_status.as_ref() {
cb(event);
}
return;
}
if let Some(Value::Map(event)) = response.get(&Value::Text("new_pki_document_event".to_string())) {
debug!("📜 New PKI document event received.");
self.parse_pki_doc(event).await;
if let Some(cb) = self.config.on_new_pki_document.as_ref() {
cb(event);
}
return;
}
if let Some(Value::Map(event)) = response.get(&Value::Text("message_sent_event".to_string())) {
debug!("📨 Message sent event received.");
if let Some(cb) = self.config.on_message_sent.as_ref() {
cb(event);
}
return;
}
if let Some(Value::Map(event)) = response.get(&Value::Text("message_reply_event".to_string())) {
debug!("📩 Message reply event received.");
if let Some(cb) = self.config.on_message_reply.as_ref() {
cb(event);
}
return;
}
error!("❌ Unknown event type received: {:?}", response);
}
async fn worker_loop(&self) {
debug!("Worker loop started");
while !self.shutdown.load(Ordering::Relaxed) {
match self.recv().await {
Ok(response) => {
if let Err(_) = self.event_sink.send(response.clone()) {
debug!("Event sink channel closed, stopping worker loop");
break;
}
self.handle_response(response).await;
},
Err(_) if self.shutdown.load(Ordering::Relaxed) => break,
Err(err) => error!("Error in recv: {}", err),
}
}
debug!("Worker loop exited.");
}
async fn event_sink_worker(
&self,
mut event_sink_rx: mpsc::UnboundedReceiver<BTreeMap<Value, Value>>,
mut drain_add_rx: mpsc::UnboundedReceiver<mpsc::UnboundedSender<BTreeMap<Value, Value>>>,
mut drain_remove_rx: mpsc::UnboundedReceiver<mpsc::UnboundedSender<BTreeMap<Value, Value>>>,
) {
debug!("Event sink worker started");
let mut drains: HashMap<usize, mpsc::UnboundedSender<BTreeMap<Value, Value>>> = HashMap::new();
let mut next_id = 0usize;
loop {
tokio::select! {
_ = async { while !self.shutdown.load(Ordering::Relaxed) { tokio::time::sleep(std::time::Duration::from_millis(100)).await; } } => {
debug!("Event sink worker shutting down");
break;
}
Some(drain) = drain_add_rx.recv() => {
drains.insert(next_id, drain);
next_id += 1;
debug!("Added new drain channel, total drains: {}", drains.len());
}
Some(drain_to_remove) = drain_remove_rx.recv() => {
drains.retain(|_, drain| !std::ptr::addr_eq(drain, &drain_to_remove));
debug!("Removed drain channel, total drains: {}", drains.len());
}
Some(event) = event_sink_rx.recv() => {
let mut bad_drains = Vec::new();
for (id, drain) in &drains {
if let Err(_) = drain.send(event.clone()) {
bad_drains.push(*id);
}
}
for id in bad_drains {
drains.remove(&id);
}
}
}
}
debug!("Event sink worker exited.");
}
async fn send_cbor_request(&self, request: BTreeMap<Value, Value>) -> Result<(), ThinClientError> {
let encoded_request = serde_cbor::to_vec(&serde_cbor::Value::Map(request))?;
let length_prefix = (encoded_request.len() as u32).to_be_bytes();
let mut write_half = self.write_half.lock().await;
match &mut *write_half {
WriteHalf::Tcp(wh) => {
wh.write_all(&length_prefix).await?;
wh.write_all(&encoded_request).await?;
}
WriteHalf::Unix(wh) => {
wh.write_all(&length_prefix).await?;
wh.write_all(&encoded_request).await?;
}
}
debug!("✅ Request sent successfully.");
Ok(())
}
pub async fn send_message_without_reply(
&self,
payload: &[u8],
dest_node: Vec<u8>,
dest_queue: Vec<u8>
) -> Result<(), ThinClientError> {
if !self.is_connected() {
return Err(ThinClientError::OfflineMode("cannot send message in offline mode - daemon not connected to mixnet".to_string()));
}
let mut send_message = BTreeMap::new();
send_message.insert(Value::Text("id".to_string()), Value::Null); send_message.insert(Value::Text("with_surb".to_string()), Value::Bool(false));
send_message.insert(Value::Text("surbid".to_string()), Value::Null); send_message.insert(Value::Text("destination_id_hash".to_string()), Value::Bytes(dest_node));
send_message.insert(Value::Text("recipient_queue_id".to_string()), Value::Bytes(dest_queue));
send_message.insert(Value::Text("payload".to_string()), Value::Bytes(payload.to_vec()));
let mut request = BTreeMap::new();
request.insert(Value::Text("send_message".to_string()), Value::Map(send_message));
self.send_cbor_request(request).await
}
pub async fn send_message(
&self,
surb_id: Vec<u8>,
payload: &[u8],
dest_node: Vec<u8>,
dest_queue: Vec<u8>
) -> Result<(), ThinClientError> {
if !self.is_connected() {
return Err(ThinClientError::OfflineMode("cannot send message in offline mode - daemon not connected to mixnet".to_string()));
}
let mut send_message = BTreeMap::new();
send_message.insert(Value::Text("id".to_string()), Value::Null); send_message.insert(Value::Text("with_surb".to_string()), Value::Bool(true));
send_message.insert(Value::Text("surbid".to_string()), Value::Bytes(surb_id));
send_message.insert(Value::Text("destination_id_hash".to_string()), Value::Bytes(dest_node));
send_message.insert(Value::Text("recipient_queue_id".to_string()), Value::Bytes(dest_queue));
send_message.insert(Value::Text("payload".to_string()), Value::Bytes(payload.to_vec()));
let mut request = BTreeMap::new();
request.insert(Value::Text("send_message".to_string()), Value::Map(send_message));
self.send_cbor_request(request).await
}
pub async fn send_reliable_message(
&self,
message_id: Vec<u8>,
payload: &[u8],
dest_node: Vec<u8>,
dest_queue: Vec<u8>
) -> Result<(), ThinClientError> {
if !self.is_connected() {
return Err(ThinClientError::OfflineMode("cannot send reliable message in offline mode - daemon not connected to mixnet".to_string()));
}
let mut send_arq_message = BTreeMap::new();
send_arq_message.insert(Value::Text("id".to_string()), Value::Bytes(message_id));
send_arq_message.insert(Value::Text("with_surb".to_string()), Value::Bool(true));
send_arq_message.insert(Value::Text("surbid".to_string()), Value::Null); send_arq_message.insert(Value::Text("destination_id_hash".to_string()), Value::Bytes(dest_node));
send_arq_message.insert(Value::Text("recipient_queue_id".to_string()), Value::Bytes(dest_queue));
send_arq_message.insert(Value::Text("payload".to_string()), Value::Bytes(payload.to_vec()));
let mut request = BTreeMap::new();
request.insert(Value::Text("send_arq_message".to_string()), Value::Map(send_arq_message));
self.send_cbor_request(request).await
}
pub async fn create_write_channel(&self) -> Result<(u16, Vec<u8>, Vec<u8>), ThinClientError> {
let query_id = Self::new_query_id();
let mut create_write_channel = BTreeMap::new();
create_write_channel.insert(Value::Text("query_id".to_string()), Value::Bytes(query_id.clone()));
let mut request = BTreeMap::new();
request.insert(Value::Text("create_write_channel".to_string()), Value::Map(create_write_channel));
self.send_cbor_request(request).await?;
let mut event_sink = self.event_sink();
loop {
let response = event_sink.recv().await
.ok_or_else(|| ThinClientError::Other("Event sink closed".to_string()))?;
if let Some(Value::Map(reply)) = response.get(&Value::Text("create_write_channel_reply".to_string())) {
if let Some(Value::Integer(error_code)) = reply.get(&Value::Text("error_code".to_string())) {
if *error_code != 0 {
return Err(ThinClientError::Other(format!("CreateWriteChannel failed with error code: {}", error_code)));
}
}
if let Some(Value::Text(err)) = reply.get(&Value::Text("err".to_string())) {
return Err(ThinClientError::Other(format!("CreateWriteChannel failed: {}", err)));
}
let channel_id = reply.get(&Value::Text("channel_id".to_string()))
.and_then(|v| match v { Value::Integer(i) => Some(*i as u16), _ => None })
.ok_or_else(|| ThinClientError::Other("Missing channel_id in response".to_string()))?;
let read_cap = match reply.get(&Value::Text("read_cap".to_string())) {
Some(Value::Bytes(bytes)) => bytes.clone(),
Some(_) => return Err(ThinClientError::Other("read_cap is unexpected type".to_string())),
None => return Err(ThinClientError::Other("Missing read_cap in response".to_string())),
};
let write_cap = match reply.get(&Value::Text("write_cap".to_string())) {
Some(Value::Bytes(bytes)) => bytes.clone(),
Some(_) => return Err(ThinClientError::Other("write_cap is unexpected type".to_string())),
None => return Err(ThinClientError::Other("Missing write_cap in response".to_string())),
};
return Ok((channel_id, read_cap, write_cap));
}
}
}
pub async fn create_read_channel(&self, read_cap: Vec<u8>) -> Result<u16, ThinClientError> {
let query_id = Self::new_query_id();
let mut create_read_channel = BTreeMap::new();
create_read_channel.insert(Value::Text("query_id".to_string()), Value::Bytes(query_id.clone()));
create_read_channel.insert(Value::Text("read_cap".to_string()), Value::Bytes(read_cap));
let mut request = BTreeMap::new();
request.insert(Value::Text("create_read_channel".to_string()), Value::Map(create_read_channel));
self.send_cbor_request(request).await?;
let mut event_sink = self.event_sink();
loop {
let response = event_sink.recv().await
.ok_or_else(|| ThinClientError::Other("Event sink closed".to_string()))?;
if let Some(Value::Map(reply)) = response.get(&Value::Text("create_read_channel_reply".to_string())) {
if let Some(Value::Integer(error_code)) = reply.get(&Value::Text("error_code".to_string())) {
if *error_code != 0 {
return Err(ThinClientError::Other(format!("CreateReadChannel failed with error code: {}", error_code)));
}
}
if let Some(Value::Text(err)) = reply.get(&Value::Text("err".to_string())) {
return Err(ThinClientError::Other(format!("CreateReadChannel failed: {}", err)));
}
let channel_id = reply.get(&Value::Text("channel_id".to_string()))
.and_then(|v| match v { Value::Integer(i) => Some(*i as u16), _ => None })
.ok_or_else(|| ThinClientError::Other("Missing channel_id in response".to_string()))?;
return Ok(channel_id);
}
}
}
pub async fn write_channel(&self, channel_id: u16, payload: &[u8]) -> Result<WriteChannelReply, ThinClientError> {
let query_id = Self::new_query_id();
let mut write_channel = BTreeMap::new();
write_channel.insert(Value::Text("channel_id".to_string()), Value::Integer(channel_id.into()));
write_channel.insert(Value::Text("query_id".to_string()), Value::Bytes(query_id.clone()));
write_channel.insert(Value::Text("payload".to_string()), Value::Bytes(payload.to_vec()));
let mut request = BTreeMap::new();
request.insert(Value::Text("write_channel".to_string()), Value::Map(write_channel));
self.send_cbor_request(request).await?;
let mut event_sink = self.event_sink();
loop {
let response = event_sink.recv().await
.ok_or_else(|| ThinClientError::Other("Event sink closed".to_string()))?;
if let Some(Value::Map(reply)) = response.get(&Value::Text("write_channel_reply".to_string())) {
if let Some(Value::Text(err)) = reply.get(&Value::Text("err".to_string())) {
return Err(ThinClientError::Other(format!("WriteChannel failed: {}", err)));
}
let send_message_payload = reply.get(&Value::Text("send_message_payload".to_string()))
.and_then(|v| match v { Value::Bytes(b) => Some(b.clone()), _ => None })
.ok_or_else(|| ThinClientError::Other("Missing send_message_payload in response".to_string()))?;
let current_message_index = match reply.get(&Value::Text("current_message_index".to_string())) {
Some(Value::Bytes(bytes)) => bytes.clone(),
Some(_) => return Err(ThinClientError::Other("current_message_index is unexpected type".to_string())),
None => return Err(ThinClientError::Other("Missing current_message_index in response".to_string())),
};
let next_message_index = match reply.get(&Value::Text("next_message_index".to_string())) {
Some(Value::Bytes(bytes)) => bytes.clone(),
Some(_) => return Err(ThinClientError::Other("next_message_index is unexpected type".to_string())),
None => return Err(ThinClientError::Other("Missing next_message_index in response".to_string())),
};
let envelope_descriptor = reply.get(&Value::Text("envelope_descriptor".to_string()))
.and_then(|v| match v { Value::Bytes(b) => Some(b.clone()), _ => None })
.ok_or_else(|| ThinClientError::Other("Missing envelope_descriptor in response".to_string()))?;
let envelope_hash = reply.get(&Value::Text("envelope_hash".to_string()))
.and_then(|v| match v { Value::Bytes(b) => Some(b.clone()), _ => None })
.ok_or_else(|| ThinClientError::Other("Missing envelope_hash in response".to_string()))?;
return Ok(WriteChannelReply {
send_message_payload,
current_message_index,
next_message_index,
envelope_descriptor,
envelope_hash,
});
}
}
}
pub async fn read_channel(&self, channel_id: u16, message_box_index: Option<&[u8]>, reply_index: Option<u8>) -> Result<ReadChannelReply, ThinClientError> {
let query_id = Self::new_query_id();
let mut read_channel = BTreeMap::new();
read_channel.insert(Value::Text("channel_id".to_string()), Value::Integer(channel_id.into()));
read_channel.insert(Value::Text("query_id".to_string()), Value::Bytes(query_id.clone()));
if let Some(index) = message_box_index {
read_channel.insert(Value::Text("message_box_index".to_string()), Value::Bytes(index.to_vec()));
}
if let Some(idx) = reply_index {
read_channel.insert(Value::Text("reply_index".to_string()), Value::Integer(idx.into()));
}
let mut request = BTreeMap::new();
request.insert(Value::Text("read_channel".to_string()), Value::Map(read_channel));
self.send_cbor_request(request).await?;
let mut event_sink = self.event_sink();
loop {
let response = event_sink.recv().await
.ok_or_else(|| ThinClientError::Other("Event sink closed".to_string()))?;
if let Some(Value::Map(reply)) = response.get(&Value::Text("read_channel_reply".to_string())) {
if let Some(Value::Text(err)) = reply.get(&Value::Text("err".to_string())) {
return Err(ThinClientError::Other(format!("ReadChannel failed: {}", err)));
}
let send_message_payload = reply.get(&Value::Text("send_message_payload".to_string()))
.and_then(|v| match v { Value::Bytes(b) => Some(b.clone()), _ => None })
.ok_or_else(|| ThinClientError::Other("Missing send_message_payload in response".to_string()))?;
let current_message_index = match reply.get(&Value::Text("current_message_index".to_string())) {
Some(Value::Bytes(bytes)) => bytes.clone(),
Some(_) => return Err(ThinClientError::Other("current_message_index is unexpected type".to_string())),
None => return Err(ThinClientError::Other("Missing current_message_index in response".to_string())),
};
let next_message_index = match reply.get(&Value::Text("next_message_index".to_string())) {
Some(Value::Bytes(bytes)) => bytes.clone(),
Some(_) => return Err(ThinClientError::Other("next_message_index is unexpected type".to_string())),
None => return Err(ThinClientError::Other("Missing next_message_index in response".to_string())),
};
let used_reply_index = reply.get(&Value::Text("reply_index".to_string()))
.and_then(|v| match v { Value::Integer(i) => Some(*i as u8), _ => None });
let envelope_descriptor = reply.get(&Value::Text("envelope_descriptor".to_string()))
.and_then(|v| match v { Value::Bytes(b) => Some(b.clone()), _ => None })
.ok_or_else(|| ThinClientError::Other("Missing envelope_descriptor in response".to_string()))?;
let envelope_hash = reply.get(&Value::Text("envelope_hash".to_string()))
.and_then(|v| match v { Value::Bytes(b) => Some(b.clone()), _ => None })
.ok_or_else(|| ThinClientError::Other("Missing envelope_hash in response".to_string()))?;
return Ok(ReadChannelReply {
send_message_payload,
current_message_index,
next_message_index,
reply_index: used_reply_index,
envelope_descriptor,
envelope_hash,
});
}
}
}
pub async fn resume_write_channel(&self, write_cap: Vec<u8>, message_box_index: Option<Vec<u8>>) -> Result<u16, ThinClientError> {
let query_id = Self::new_query_id();
let mut resume_write_channel = BTreeMap::new();
resume_write_channel.insert(Value::Text("query_id".to_string()), Value::Bytes(query_id.clone()));
resume_write_channel.insert(Value::Text("write_cap".to_string()), Value::Bytes(write_cap));
if let Some(index) = message_box_index {
resume_write_channel.insert(Value::Text("message_box_index".to_string()), Value::Bytes(index));
}
let mut request = BTreeMap::new();
request.insert(Value::Text("resume_write_channel".to_string()), Value::Map(resume_write_channel));
self.send_cbor_request(request).await?;
let mut event_sink = self.event_sink();
loop {
let response = event_sink.recv().await
.ok_or_else(|| ThinClientError::Other("Event sink closed".to_string()))?;
if let Some(Value::Map(reply)) = response.get(&Value::Text("resume_write_channel_reply".to_string())) {
if let Some(Value::Integer(error_code)) = reply.get(&Value::Text("error_code".to_string())) {
if *error_code != 0 {
return Err(ThinClientError::Other(format!("ResumeWriteChannel failed with error code: {}", error_code)));
}
}
if let Some(Value::Text(err)) = reply.get(&Value::Text("err".to_string())) {
return Err(ThinClientError::Other(format!("ResumeWriteChannel failed: {}", err)));
}
let channel_id = reply.get(&Value::Text("channel_id".to_string()))
.and_then(|v| match v { Value::Integer(i) => Some(*i as u16), _ => None })
.ok_or_else(|| ThinClientError::Other("Missing channel_id in response".to_string()))?;
return Ok(channel_id);
}
}
}
pub async fn resume_read_channel(&self, read_cap: Vec<u8>, next_message_index: Option<Vec<u8>>, reply_index: Option<u8>) -> Result<u16, ThinClientError> {
let query_id = Self::new_query_id();
let mut resume_read_channel = BTreeMap::new();
resume_read_channel.insert(Value::Text("query_id".to_string()), Value::Bytes(query_id.clone()));
resume_read_channel.insert(Value::Text("read_cap".to_string()), Value::Bytes(read_cap));
if let Some(index) = next_message_index {
resume_read_channel.insert(Value::Text("next_message_index".to_string()), Value::Bytes(index));
}
if let Some(index) = reply_index {
resume_read_channel.insert(Value::Text("reply_index".to_string()), Value::Integer(index.into()));
}
let mut request = BTreeMap::new();
request.insert(Value::Text("resume_read_channel".to_string()), Value::Map(resume_read_channel));
self.send_cbor_request(request).await?;
let mut event_sink = self.event_sink();
loop {
let response = event_sink.recv().await
.ok_or_else(|| ThinClientError::Other("Event sink closed".to_string()))?;
if let Some(Value::Map(reply)) = response.get(&Value::Text("resume_read_channel_reply".to_string())) {
if let Some(Value::Integer(error_code)) = reply.get(&Value::Text("error_code".to_string())) {
if *error_code != 0 {
return Err(ThinClientError::Other(format!("ResumeReadChannel failed with error code: {}", error_code)));
}
}
if let Some(Value::Text(err)) = reply.get(&Value::Text("err".to_string())) {
return Err(ThinClientError::Other(format!("ResumeReadChannel failed: {}", err)));
}
let channel_id = reply.get(&Value::Text("channel_id".to_string()))
.and_then(|v| match v { Value::Integer(i) => Some(*i as u16), _ => None })
.ok_or_else(|| ThinClientError::Other("Missing channel_id in response".to_string()))?;
return Ok(channel_id);
}
}
}
pub async fn resume_write_channel_query(
&self,
write_cap: Vec<u8>,
message_box_index: Vec<u8>,
envelope_descriptor: Vec<u8>,
envelope_hash: Vec<u8>,
) -> Result<u16, ThinClientError> {
let query_id = Self::new_query_id();
let mut resume_write_channel_query = BTreeMap::new();
resume_write_channel_query.insert(Value::Text("query_id".to_string()), Value::Bytes(query_id.clone()));
resume_write_channel_query.insert(Value::Text("write_cap".to_string()), Value::Bytes(write_cap));
resume_write_channel_query.insert(Value::Text("message_box_index".to_string()), Value::Bytes(message_box_index));
resume_write_channel_query.insert(Value::Text("envelope_descriptor".to_string()), Value::Bytes(envelope_descriptor));
resume_write_channel_query.insert(Value::Text("envelope_hash".to_string()), Value::Bytes(envelope_hash));
let mut request = BTreeMap::new();
request.insert(Value::Text("resume_write_channel_query".to_string()), Value::Map(resume_write_channel_query));
self.send_cbor_request(request).await?;
let mut event_sink = self.event_sink();
loop {
let response = event_sink.recv().await
.ok_or_else(|| ThinClientError::Other("Event sink closed".to_string()))?;
if let Some(Value::Map(reply)) = response.get(&Value::Text("resume_write_channel_query_reply".to_string())) {
if let Some(Value::Integer(error_code)) = reply.get(&Value::Text("error_code".to_string())) {
if *error_code != 0 {
return Err(ThinClientError::Other(format!("ResumeWriteChannelQuery failed with error code: {}", error_code)));
}
}
if let Some(Value::Text(err)) = reply.get(&Value::Text("err".to_string())) {
return Err(ThinClientError::Other(format!("ResumeWriteChannelQuery failed: {}", err)));
}
let channel_id = reply.get(&Value::Text("channel_id".to_string()))
.and_then(|v| match v { Value::Integer(i) => Some(*i as u16), _ => None })
.ok_or_else(|| ThinClientError::Other("Missing channel_id in response".to_string()))?;
return Ok(channel_id);
}
}
}
pub async fn resume_read_channel_query(
&self,
read_cap: Vec<u8>,
next_message_index: Vec<u8>,
reply_index: Option<u8>,
envelope_descriptor: Vec<u8>,
envelope_hash: Vec<u8>,
) -> Result<u16, ThinClientError> {
let query_id = Self::new_query_id();
let mut resume_read_channel_query = BTreeMap::new();
resume_read_channel_query.insert(Value::Text("query_id".to_string()), Value::Bytes(query_id.clone()));
resume_read_channel_query.insert(Value::Text("read_cap".to_string()), Value::Bytes(read_cap));
resume_read_channel_query.insert(Value::Text("next_message_index".to_string()), Value::Bytes(next_message_index));
if let Some(index) = reply_index {
resume_read_channel_query.insert(Value::Text("reply_index".to_string()), Value::Integer(index.into()));
}
resume_read_channel_query.insert(Value::Text("envelope_descriptor".to_string()), Value::Bytes(envelope_descriptor));
resume_read_channel_query.insert(Value::Text("envelope_hash".to_string()), Value::Bytes(envelope_hash));
let mut request = BTreeMap::new();
request.insert(Value::Text("resume_read_channel_query".to_string()), Value::Map(resume_read_channel_query));
self.send_cbor_request(request).await?;
let mut event_sink = self.event_sink();
loop {
let response = event_sink.recv().await
.ok_or_else(|| ThinClientError::Other("Event sink closed".to_string()))?;
if let Some(Value::Map(reply)) = response.get(&Value::Text("resume_read_channel_query_reply".to_string())) {
if let Some(Value::Integer(error_code)) = reply.get(&Value::Text("error_code".to_string())) {
if *error_code != 0 {
return Err(ThinClientError::Other(format!("ResumeReadChannelQuery failed with error code: {}", error_code)));
}
}
if let Some(Value::Text(err)) = reply.get(&Value::Text("err".to_string())) {
return Err(ThinClientError::Other(format!("ResumeReadChannelQuery failed: {}", err)));
}
let channel_id = reply.get(&Value::Text("channel_id".to_string()))
.and_then(|v| match v { Value::Integer(i) => Some(*i as u16), _ => None })
.ok_or_else(|| ThinClientError::Other("Missing channel_id in response".to_string()))?;
return Ok(channel_id);
}
}
}
pub async fn send_channel_query(
&self,
channel_id: u16,
payload: &[u8],
dest_node: Vec<u8>,
dest_queue: Vec<u8>,
message_id: Vec<u8>,
) -> Result<(), ThinClientError> {
if !self.is_connected() {
return Err(ThinClientError::OfflineMode("cannot send channel query in offline mode - daemon not connected to mixnet".to_string()));
}
let mut send_channel_query = BTreeMap::new();
send_channel_query.insert(Value::Text("message_id".to_string()), Value::Bytes(message_id));
send_channel_query.insert(Value::Text("channel_id".to_string()), Value::Integer(channel_id.into()));
send_channel_query.insert(Value::Text("destination_id_hash".to_string()), Value::Bytes(dest_node));
send_channel_query.insert(Value::Text("recipient_queue_id".to_string()), Value::Bytes(dest_queue));
send_channel_query.insert(Value::Text("payload".to_string()), Value::Bytes(payload.to_vec()));
let mut request = BTreeMap::new();
request.insert(Value::Text("send_channel_query".to_string()), Value::Map(send_channel_query));
self.send_cbor_request(request).await
}
pub async fn send_channel_query_await_reply(
&self,
channel_id: u16,
payload: &[u8],
dest_node: Vec<u8>,
dest_queue: Vec<u8>,
message_id: Vec<u8>,
) -> Result<Vec<u8>, ThinClientError> {
let mut event_sink = self.event_sink();
self.send_channel_query(channel_id, payload, dest_node, dest_queue, message_id.clone()).await?;
loop {
match event_sink.recv().await {
Some(response) => {
if let Some(Value::Map(event)) = response.get(&Value::Text("channel_query_sent_event".to_string())) {
if let Some(Value::Bytes(reply_message_id)) = event.get(&Value::Text("message_id".to_string())) {
if reply_message_id == &message_id {
if let Some(Value::Integer(error_code)) = event.get(&Value::Text("error_code".to_string())) {
if *error_code != 0 {
return Err(ThinClientError::Other(format!("Channel query send failed with error code: {}", error_code)));
}
}
continue;
}
}
}
if let Some(Value::Map(event)) = response.get(&Value::Text("channel_query_reply_event".to_string())) {
if let Some(Value::Bytes(reply_message_id)) = event.get(&Value::Text("message_id".to_string())) {
if reply_message_id == &message_id {
if let Some(Value::Integer(error_code)) = event.get(&Value::Text("error_code".to_string())) {
if *error_code != 0 {
return Err(ThinClientError::Other(format!("Channel query failed with error code: {}", error_code)));
}
}
if let Some(Value::Bytes(reply_payload)) = event.get(&Value::Text("payload".to_string())) {
return Ok(reply_payload.clone());
} else {
return Err(ThinClientError::Other("Missing payload in channel query reply".to_string()));
}
}
}
}
}
None => {
return Err(ThinClientError::Other("Event sink closed while waiting for reply".to_string()));
}
}
}
}
pub async fn close_channel(&self, channel_id: u16) -> Result<(), ThinClientError> {
let mut close_channel = BTreeMap::new();
close_channel.insert(Value::Text("channel_id".to_string()), Value::Integer(channel_id.into()));
let mut request = BTreeMap::new();
request.insert(Value::Text("close_channel".to_string()), Value::Map(close_channel));
self.send_cbor_request(request).await
}
}
pub fn find_services(capability: &str, doc: &BTreeMap<Value, Value>) -> Vec<ServiceDescriptor> {
let mut services = Vec::new();
let Some(Value::Array(nodes)) = doc.get(&Value::Text("ServiceNodes".to_string())) else {
println!("❌ No 'ServiceNodes' found in PKI document.");
return services;
};
for node in nodes {
let Value::Bytes(node_bytes) = node else { continue };
let Ok(mynode) = from_slice::<BTreeMap<Value, Value>>(node_bytes) else { continue };
if let Some(Value::Map(details)) = mynode.get(&Value::Text("Kaetzchen".to_string())) {
println!("🔍 Available Capabilities: {:?}", details.keys());
}
let Some(Value::Map(details)) = mynode.get(&Value::Text("Kaetzchen".to_string())) else { continue };
let Some(Value::Map(service)) = details.get(&Value::Text(capability.to_string())) else { continue };
let Some(Value::Text(endpoint)) = service.get(&Value::Text("endpoint".to_string())) else { continue };
println!("returning a service descriptor!");
services.push(ServiceDescriptor {
recipient_queue_id: endpoint.as_bytes().to_vec(),
mix_descriptor: mynode,
});
}
services
}
fn convert_to_pretty_json(value: &Value) -> serde_json::Value {
match value {
Value::Text(s) => serde_json::Value::String(s.clone()),
Value::Integer(i) => json!(*i),
Value::Bytes(b) => json!(hex::encode(b)), Value::Array(arr) => serde_json::Value::Array(arr.iter().map(convert_to_pretty_json).collect()),
Value::Map(map) => {
let converted_map: serde_json::Map<String, serde_json::Value> = map
.iter()
.map(|(key, value)| {
let key_str = match key {
Value::Text(s) => s.clone(),
_ => format!("{:?}", key),
};
(key_str, convert_to_pretty_json(value))
})
.collect();
serde_json::Value::Object(converted_map)
}
_ => serde_json::Value::Null, }
}
fn decode_cbor_nodes(nodes: &[Value]) -> Vec<Value> {
nodes
.iter()
.filter_map(|node| match node {
Value::Bytes(blob) => serde_cbor::from_slice::<BTreeMap<Value, Value>>(blob)
.ok()
.map(Value::Map),
_ => Some(node.clone()), })
.collect()
}
pub fn pretty_print_pki_doc(doc: &BTreeMap<Value, Value>) {
let mut new_doc = BTreeMap::new();
if let Some(Value::Array(gateway_nodes)) = doc.get(&Value::Text("GatewayNodes".to_string())) {
new_doc.insert(Value::Text("GatewayNodes".to_string()), Value::Array(decode_cbor_nodes(gateway_nodes)));
}
if let Some(Value::Array(service_nodes)) = doc.get(&Value::Text("ServiceNodes".to_string())) {
new_doc.insert(Value::Text("ServiceNodes".to_string()), Value::Array(decode_cbor_nodes(service_nodes)));
}
if let Some(Value::Array(topology_layers)) = doc.get(&Value::Text("Topology".to_string())) {
let decoded_topology: Vec<Value> = topology_layers
.iter()
.flat_map(|layer| match layer {
Value::Array(layer_nodes) => decode_cbor_nodes(layer_nodes),
_ => vec![],
})
.collect();
new_doc.insert(Value::Text("Topology".to_string()), Value::Array(decoded_topology));
}
for (key, value) in doc.iter() {
if !matches!(key, Value::Text(s) if ["GatewayNodes", "ServiceNodes", "Topology"].contains(&s.as_str())) {
let key_str = key.clone();
let decoded_value = match value {
Value::Bytes(blob) => serde_cbor::from_slice::<BTreeMap<Value, Value>>(blob)
.ok()
.map(Value::Map)
.unwrap_or(value.clone()), _ => value.clone(),
};
new_doc.insert(key_str, decoded_value);
}
}
let pretty_json = convert_to_pretty_json(&Value::Map(new_doc));
println!("{}", serde_json::to_string_pretty(&pretty_json).unwrap());
}