channel_http/
channel_http.rs1use std::{collections::HashMap, time::Duration};
5
6use reifydb_client::http::{HttpChannelResponse, HttpChannelSession};
7
8fn main() -> Result<(), Box<dyn std::error::Error>> {
9 let (session, receiver) = HttpChannelSession::new("127.0.0.1", 8090, Some("mysecrettoken".to_string()))?;
11
12 if let Ok(msg) = receiver.recv_timeout(Duration::from_millis(100)) {
14 if let Ok(HttpChannelResponse::Auth {
15 request_id,
16 }) = msg.response
17 {
18 println!("Authenticated with ID: {}", request_id);
19 }
20 }
21
22 println!("Sending multiple requests concurrently...");
24
25 let command_id = session.command("MAP{1}", None)?;
27 println!("Command sent with ID: {}", command_id);
28
29 let query_id1 = session.query("MAP { x: 42, y: 'hello' }", None)?;
31 println!("Query 1 sent with ID: {}", query_id1);
32
33 let query_id2 = session.query("MAP { a: 123, b: 'world' }", None)?;
34 println!("Query 2 sent with ID: {}", query_id2);
35
36 let query_id3 = session.query("MAP { count: 999, active: true }", None)?;
37 println!("Query 3 sent with ID: {}", query_id3);
38
39 let mut expected_requests = HashMap::new();
41 expected_requests.insert(command_id.clone(), "MAP{1}");
42 expected_requests.insert(query_id1.clone(), "Query 1 (x: 42, y: 'hello')");
43 expected_requests.insert(query_id2.clone(), "Query 2 (a: 123, b: 'world')");
44 expected_requests.insert(query_id3.clone(), "Query 3 (count: 999, active: true)");
45
46 println!("\nWaiting for responses (they may arrive out of order)...");
47
48 let mut received = 0;
50 let total_expected = expected_requests.len();
51
52 while received < total_expected {
53 match receiver.recv_timeout(Duration::from_secs(2)) {
54 Ok(msg) => {
55 let request_description =
56 expected_requests.get(&msg.request_id).unwrap_or(&"Unknown request");
57
58 match msg.response {
59 Ok(HttpChannelResponse::Command {
60 request_id,
61 result,
62 }) => {
63 println!(
64 "ā Received response for {}: Command {} executed ({} frames)",
65 request_description,
66 request_id,
67 result.frames.len()
68 );
69 expected_requests.remove(&request_id);
70 received += 1;
71 }
72 Ok(HttpChannelResponse::Query {
73 request_id,
74 result,
75 }) => {
76 println!(
77 "ā Received response for {}: Query {} executed ({} frames)",
78 request_description,
79 request_id,
80 result.frames.len()
81 );
82
83 if let Some(frame) = result.frames.first() {
86 println!("{frame}");
87 }
88
89 expected_requests.remove(&request_id);
90 received += 1;
91 }
92 Ok(HttpChannelResponse::Auth {
93 ..
94 }) => {
95 }
97 Err(e) => {
98 println!(
99 "ā Request {} ({}) failed: {}",
100 msg.request_id, request_description, e
101 );
102 expected_requests.remove(&msg.request_id);
103 received += 1;
104 }
105 }
106 }
107 Err(_) => {
108 println!("Timeout waiting for responses");
109 if !expected_requests.is_empty() {
110 println!(
111 "Still waiting for: {:?}",
112 expected_requests.values().collect::<Vec<_>>()
113 );
114 }
115 break;
116 }
117 }
118 }
119
120 if expected_requests.is_empty() {
121 println!("\nš All {} requests completed successfully!", total_expected);
122 println!("This demonstrates async HTTP multiplexing - requests were sent concurrently");
123 println!("and responses were received and processed as they arrived.");
124 } else {
125 println!("\nā ļø {} requests did not complete within timeout", expected_requests.len());
126 }
127
128 Ok(())
129}