echo_ping/
echo_ping.rs

1
2use std::env;
3use std::collections::BTreeMap;
4use std::sync::{Arc, Mutex};
5use std::process;
6
7use tokio::time::{timeout, Duration};
8use tokio::runtime::Runtime;
9
10use serde_cbor::Value;
11
12use katzenpost_thin_client::{ThinClient, Config, pretty_print_pki_doc};
13
14
15struct ClientState {
16    reply_message: Arc<Mutex<Option<BTreeMap<Value, Value>>>>,
17    pki_received: Arc<Mutex<bool>>,
18}
19
20impl ClientState {
21    fn new() -> Self {
22        Self {
23            reply_message: Arc::new(Mutex::new(None)),
24            pki_received: Arc::new(Mutex::new(false)),
25        }
26    }
27
28    fn save_reply(&self, reply: &BTreeMap<Value, Value>) {
29        let mut stored_reply = self.reply_message.lock().unwrap();
30        *stored_reply = Some(reply.clone());
31    }
32
33    fn set_pki_received(&self) {
34        let mut pki_flag = self.pki_received.lock().unwrap();
35        *pki_flag = true;
36    }
37
38    fn is_pki_received(&self) -> bool {
39        *self.pki_received.lock().unwrap()
40    }
41
42    fn await_message_reply(&self) -> Option<BTreeMap<Value, Value>> {
43        let stored_reply = self.reply_message.lock().unwrap();
44        stored_reply.clone()
45    }
46}
47
48
49fn main() {
50    let args: Vec<String> = env::args().collect();
51    if args.len() != 2 {
52        eprintln!("Usage: {} <config_path>", args[0]);
53        process::exit(1);
54    }
55    let config_path = &args[1];
56
57    let rt = Runtime::new().unwrap();
58    rt.block_on(run_client(config_path)).unwrap();
59}
60
61async fn run_client(config_path: &str) -> Result<(), Box<dyn std::error::Error>> {
62    let state = Arc::new(ClientState::new());
63    let state_for_reply = Arc::clone(&state);
64    let state_for_pki = Arc::clone(&state);
65
66    let mut cfg = Config::new(config_path)?;
67    cfg.on_new_pki_document = Some(Arc::new(move |_pki_doc| {
68        println!("✅ PKI document received.");
69        state_for_pki.set_pki_received();
70    }));
71    cfg.on_message_reply = Some(Arc::new(move |reply| {
72        println!("📩 Received a reply!");
73        state_for_reply.save_reply(reply);
74    }));
75
76    println!("🚀 Initializing ThinClient...");
77    let client = ThinClient::new(cfg).await?;
78
79    println!("⏳ Waiting for PKI document...");
80    let result = timeout(Duration::from_secs(5), async {
81        loop {
82            if state.is_pki_received() {
83                break;
84            }
85            tokio::task::yield_now().await;
86        }
87    })
88    .await;
89
90    if result.is_err() {
91        return Err("❌ PKI document not received in time.".into());
92    }
93
94    println!("✅ Pretty printing PKI document:");
95    let doc = client.pki_document().await;
96    pretty_print_pki_doc(&doc);
97    println!("AFTER Pretty printing PKI document");
98
99
100    let service_desc = client.get_service("echo").await?;
101    println!("got service descriptor for echo service");
102
103    let surb_id = ThinClient::new_surb_id();
104    let payload = b"hello".to_vec();
105    let (dest_node, dest_queue) = service_desc.to_destination();
106
107    println!("before calling send_message");
108    client.send_message(surb_id, &payload, dest_node, dest_queue).await?;
109    println!("after calling send_message");
110    
111    println!("⏳ Waiting for message reply...");
112    let state_for_reply_wait = Arc::clone(&state);
113
114    let result = timeout(Duration::from_secs(5), async move {
115        loop {
116            if let Some(reply) = state_for_reply_wait.await_message_reply() {
117                if let Some(Value::Bytes(payload2)) = reply.get(&Value::Text("payload".to_string())) {
118                    let payload2 = &payload2[..payload.len()];
119                    assert_eq!(payload, payload2, "Reply does not match payload!");
120                    println!("✅ Received valid reply, stopping client.");
121                    return Ok::<(), Box<dyn std::error::Error>>(());
122                }
123            }
124            tokio::task::yield_now().await;
125        }
126    }).await;
127
128    result.map_err(|e| Box::new(e))??;
129    client.stop().await;
130    println!("✅ Client stopped successfully.");
131    Ok(())
132}