pub struct HttpChannelSession { /* private fields */ }Expand description
A channel-based HTTP session for message-passing style communication
Implementations§
Source§impl HttpChannelSession
impl HttpChannelSession
Sourcepub fn new(
host: &str,
port: u16,
token: Option<String>,
) -> Result<(Self, Receiver<HttpResponseMessage>), Error>
pub fn new( host: &str, port: u16, token: Option<String>, ) -> Result<(Self, Receiver<HttpResponseMessage>), Error>
Create a new channel HTTP session
Examples found in repository?
examples/channel_http.rs (line 10)
8fn main() -> Result<(), Box<dyn std::error::Error>> {
9 // Connect to ReifyDB HTTP server
10 let (session, receiver) = HttpChannelSession::new("127.0.0.1", 8090, Some("mysecrettoken".to_string()))?;
11
12 // Consume authentication response
13 if let Ok(msg) = receiver.recv_timeout(Duration::from_millis(100)) {
14 if let Ok(HttpChannelResponse::Auth {
15 request_id,
16 }) = msg.response
17 {
18 println!("Authenticated with ID: {}", request_id);
19 }
20 }
21
22 // Send multiple requests asynchronously to demonstrate multiplexing
23 println!("Sending multiple requests concurrently...");
24
25 // Send a command to create a table
26 let command_id = session.command("MAP{1}", None)?;
27 println!("Command sent with ID: {}", command_id);
28
29 // Send multiple queries concurrently
30 let query_id1 = session.query("MAP { x: 42, y: 'hello' }", None)?;
31 println!("Query 1 sent with ID: {}", query_id1);
32
33 let query_id2 = session.query("MAP { a: 123, b: 'world' }", None)?;
34 println!("Query 2 sent with ID: {}", query_id2);
35
36 let query_id3 = session.query("MAP { count: 999, active: true }", None)?;
37 println!("Query 3 sent with ID: {}", query_id3);
38
39 // Track which requests we're expecting
40 let mut expected_requests = HashMap::new();
41 expected_requests.insert(command_id.clone(), "MAP{1}");
42 expected_requests.insert(query_id1.clone(), "Query 1 (x: 42, y: 'hello')");
43 expected_requests.insert(query_id2.clone(), "Query 2 (a: 123, b: 'world')");
44 expected_requests.insert(query_id3.clone(), "Query 3 (count: 999, active: true)");
45
46 println!("\nWaiting for responses (they may arrive out of order)...");
47
48 // Receive responses asynchronously - they may arrive in any order
49 let mut received = 0;
50 let total_expected = expected_requests.len();
51
52 while received < total_expected {
53 match receiver.recv_timeout(Duration::from_secs(2)) {
54 Ok(msg) => {
55 let request_description =
56 expected_requests.get(&msg.request_id).unwrap_or(&"Unknown request");
57
58 match msg.response {
59 Ok(HttpChannelResponse::Command {
60 request_id,
61 result,
62 }) => {
63 println!(
64 "✓ Received response for {}: Command {} executed ({} frames)",
65 request_description,
66 request_id,
67 result.frames.len()
68 );
69 expected_requests.remove(&request_id);
70 received += 1;
71 }
72 Ok(HttpChannelResponse::Query {
73 request_id,
74 result,
75 }) => {
76 println!(
77 "✓ Received response for {}: Query {} executed ({} frames)",
78 request_description,
79 request_id,
80 result.frames.len()
81 );
82
83 // Print first frame if
84 // available
85 if let Some(frame) = result.frames.first() {
86 println!("{frame}");
87 }
88
89 expected_requests.remove(&request_id);
90 received += 1;
91 }
92 Ok(HttpChannelResponse::Auth {
93 ..
94 }) => {
95 // Already handled above
96 }
97 Err(e) => {
98 println!(
99 "✗ Request {} ({}) failed: {}",
100 msg.request_id, request_description, e
101 );
102 expected_requests.remove(&msg.request_id);
103 received += 1;
104 }
105 }
106 }
107 Err(_) => {
108 println!("Timeout waiting for responses");
109 if !expected_requests.is_empty() {
110 println!(
111 "Still waiting for: {:?}",
112 expected_requests.values().collect::<Vec<_>>()
113 );
114 }
115 break;
116 }
117 }
118 }
119
120 if expected_requests.is_empty() {
121 println!("\n🎉 All {} requests completed successfully!", total_expected);
122 println!("This demonstrates async HTTP multiplexing - requests were sent concurrently");
123 println!("and responses were received and processed as they arrived.");
124 } else {
125 println!("\n⚠️ {} requests did not complete within timeout", expected_requests.len());
126 }
127
128 Ok(())
129}Sourcepub fn from_client(
client: HttpClient,
token: Option<String>,
) -> Result<(Self, Receiver<HttpResponseMessage>), Error>
pub fn from_client( client: HttpClient, token: Option<String>, ) -> Result<(Self, Receiver<HttpResponseMessage>), Error>
Create a new channel HTTP session from an existing client
Sourcepub fn from_url(
url: &str,
token: Option<String>,
) -> Result<(Self, Receiver<HttpResponseMessage>), Error>
pub fn from_url( url: &str, token: Option<String>, ) -> Result<(Self, Receiver<HttpResponseMessage>), Error>
Create from URL (e.g., “http://localhost:8080”)
Sourcepub fn command(
&self,
rql: &str,
params: Option<Params>,
) -> Result<String, Box<dyn Error>>
pub fn command( &self, rql: &str, params: Option<Params>, ) -> Result<String, Box<dyn Error>>
Send a command (response arrives on channel)
Examples found in repository?
examples/channel_http.rs (line 26)
8fn main() -> Result<(), Box<dyn std::error::Error>> {
9 // Connect to ReifyDB HTTP server
10 let (session, receiver) = HttpChannelSession::new("127.0.0.1", 8090, Some("mysecrettoken".to_string()))?;
11
12 // Consume authentication response
13 if let Ok(msg) = receiver.recv_timeout(Duration::from_millis(100)) {
14 if let Ok(HttpChannelResponse::Auth {
15 request_id,
16 }) = msg.response
17 {
18 println!("Authenticated with ID: {}", request_id);
19 }
20 }
21
22 // Send multiple requests asynchronously to demonstrate multiplexing
23 println!("Sending multiple requests concurrently...");
24
25 // Send a command to create a table
26 let command_id = session.command("MAP{1}", None)?;
27 println!("Command sent with ID: {}", command_id);
28
29 // Send multiple queries concurrently
30 let query_id1 = session.query("MAP { x: 42, y: 'hello' }", None)?;
31 println!("Query 1 sent with ID: {}", query_id1);
32
33 let query_id2 = session.query("MAP { a: 123, b: 'world' }", None)?;
34 println!("Query 2 sent with ID: {}", query_id2);
35
36 let query_id3 = session.query("MAP { count: 999, active: true }", None)?;
37 println!("Query 3 sent with ID: {}", query_id3);
38
39 // Track which requests we're expecting
40 let mut expected_requests = HashMap::new();
41 expected_requests.insert(command_id.clone(), "MAP{1}");
42 expected_requests.insert(query_id1.clone(), "Query 1 (x: 42, y: 'hello')");
43 expected_requests.insert(query_id2.clone(), "Query 2 (a: 123, b: 'world')");
44 expected_requests.insert(query_id3.clone(), "Query 3 (count: 999, active: true)");
45
46 println!("\nWaiting for responses (they may arrive out of order)...");
47
48 // Receive responses asynchronously - they may arrive in any order
49 let mut received = 0;
50 let total_expected = expected_requests.len();
51
52 while received < total_expected {
53 match receiver.recv_timeout(Duration::from_secs(2)) {
54 Ok(msg) => {
55 let request_description =
56 expected_requests.get(&msg.request_id).unwrap_or(&"Unknown request");
57
58 match msg.response {
59 Ok(HttpChannelResponse::Command {
60 request_id,
61 result,
62 }) => {
63 println!(
64 "✓ Received response for {}: Command {} executed ({} frames)",
65 request_description,
66 request_id,
67 result.frames.len()
68 );
69 expected_requests.remove(&request_id);
70 received += 1;
71 }
72 Ok(HttpChannelResponse::Query {
73 request_id,
74 result,
75 }) => {
76 println!(
77 "✓ Received response for {}: Query {} executed ({} frames)",
78 request_description,
79 request_id,
80 result.frames.len()
81 );
82
83 // Print first frame if
84 // available
85 if let Some(frame) = result.frames.first() {
86 println!("{frame}");
87 }
88
89 expected_requests.remove(&request_id);
90 received += 1;
91 }
92 Ok(HttpChannelResponse::Auth {
93 ..
94 }) => {
95 // Already handled above
96 }
97 Err(e) => {
98 println!(
99 "✗ Request {} ({}) failed: {}",
100 msg.request_id, request_description, e
101 );
102 expected_requests.remove(&msg.request_id);
103 received += 1;
104 }
105 }
106 }
107 Err(_) => {
108 println!("Timeout waiting for responses");
109 if !expected_requests.is_empty() {
110 println!(
111 "Still waiting for: {:?}",
112 expected_requests.values().collect::<Vec<_>>()
113 );
114 }
115 break;
116 }
117 }
118 }
119
120 if expected_requests.is_empty() {
121 println!("\n🎉 All {} requests completed successfully!", total_expected);
122 println!("This demonstrates async HTTP multiplexing - requests were sent concurrently");
123 println!("and responses were received and processed as they arrived.");
124 } else {
125 println!("\n⚠️ {} requests did not complete within timeout", expected_requests.len());
126 }
127
128 Ok(())
129}Sourcepub fn query(
&self,
rql: &str,
params: Option<Params>,
) -> Result<String, Box<dyn Error>>
pub fn query( &self, rql: &str, params: Option<Params>, ) -> Result<String, Box<dyn Error>>
Send a query (response arrives on channel)
Examples found in repository?
examples/channel_http.rs (line 30)
8fn main() -> Result<(), Box<dyn std::error::Error>> {
9 // Connect to ReifyDB HTTP server
10 let (session, receiver) = HttpChannelSession::new("127.0.0.1", 8090, Some("mysecrettoken".to_string()))?;
11
12 // Consume authentication response
13 if let Ok(msg) = receiver.recv_timeout(Duration::from_millis(100)) {
14 if let Ok(HttpChannelResponse::Auth {
15 request_id,
16 }) = msg.response
17 {
18 println!("Authenticated with ID: {}", request_id);
19 }
20 }
21
22 // Send multiple requests asynchronously to demonstrate multiplexing
23 println!("Sending multiple requests concurrently...");
24
25 // Send a command to create a table
26 let command_id = session.command("MAP{1}", None)?;
27 println!("Command sent with ID: {}", command_id);
28
29 // Send multiple queries concurrently
30 let query_id1 = session.query("MAP { x: 42, y: 'hello' }", None)?;
31 println!("Query 1 sent with ID: {}", query_id1);
32
33 let query_id2 = session.query("MAP { a: 123, b: 'world' }", None)?;
34 println!("Query 2 sent with ID: {}", query_id2);
35
36 let query_id3 = session.query("MAP { count: 999, active: true }", None)?;
37 println!("Query 3 sent with ID: {}", query_id3);
38
39 // Track which requests we're expecting
40 let mut expected_requests = HashMap::new();
41 expected_requests.insert(command_id.clone(), "MAP{1}");
42 expected_requests.insert(query_id1.clone(), "Query 1 (x: 42, y: 'hello')");
43 expected_requests.insert(query_id2.clone(), "Query 2 (a: 123, b: 'world')");
44 expected_requests.insert(query_id3.clone(), "Query 3 (count: 999, active: true)");
45
46 println!("\nWaiting for responses (they may arrive out of order)...");
47
48 // Receive responses asynchronously - they may arrive in any order
49 let mut received = 0;
50 let total_expected = expected_requests.len();
51
52 while received < total_expected {
53 match receiver.recv_timeout(Duration::from_secs(2)) {
54 Ok(msg) => {
55 let request_description =
56 expected_requests.get(&msg.request_id).unwrap_or(&"Unknown request");
57
58 match msg.response {
59 Ok(HttpChannelResponse::Command {
60 request_id,
61 result,
62 }) => {
63 println!(
64 "✓ Received response for {}: Command {} executed ({} frames)",
65 request_description,
66 request_id,
67 result.frames.len()
68 );
69 expected_requests.remove(&request_id);
70 received += 1;
71 }
72 Ok(HttpChannelResponse::Query {
73 request_id,
74 result,
75 }) => {
76 println!(
77 "✓ Received response for {}: Query {} executed ({} frames)",
78 request_description,
79 request_id,
80 result.frames.len()
81 );
82
83 // Print first frame if
84 // available
85 if let Some(frame) = result.frames.first() {
86 println!("{frame}");
87 }
88
89 expected_requests.remove(&request_id);
90 received += 1;
91 }
92 Ok(HttpChannelResponse::Auth {
93 ..
94 }) => {
95 // Already handled above
96 }
97 Err(e) => {
98 println!(
99 "✗ Request {} ({}) failed: {}",
100 msg.request_id, request_description, e
101 );
102 expected_requests.remove(&msg.request_id);
103 received += 1;
104 }
105 }
106 }
107 Err(_) => {
108 println!("Timeout waiting for responses");
109 if !expected_requests.is_empty() {
110 println!(
111 "Still waiting for: {:?}",
112 expected_requests.values().collect::<Vec<_>>()
113 );
114 }
115 break;
116 }
117 }
118 }
119
120 if expected_requests.is_empty() {
121 println!("\n🎉 All {} requests completed successfully!", total_expected);
122 println!("This demonstrates async HTTP multiplexing - requests were sent concurrently");
123 println!("and responses were received and processed as they arrived.");
124 } else {
125 println!("\n⚠️ {} requests did not complete within timeout", expected_requests.len());
126 }
127
128 Ok(())
129}Auto Trait Implementations§
impl Freeze for HttpChannelSession
impl RefUnwindSafe for HttpChannelSession
impl Send for HttpChannelSession
impl Sync for HttpChannelSession
impl Unpin for HttpChannelSession
impl UnwindSafe for HttpChannelSession
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more