HttpChannelSession

Struct HttpChannelSession 

Source
pub struct HttpChannelSession { /* private fields */ }
Expand description

A channel-based HTTP session for message-passing style communication

Implementations§

Source§

impl HttpChannelSession

Source

pub fn new( host: &str, port: u16, token: Option<String>, ) -> Result<(Self, Receiver<HttpResponseMessage>), Error>

Create a new channel HTTP session

Examples found in repository?
examples/channel_http.rs (line 10)
8fn main() -> Result<(), Box<dyn std::error::Error>> {
9	// Connect to ReifyDB HTTP server
10	let (session, receiver) = HttpChannelSession::new("127.0.0.1", 8090, Some("mysecrettoken".to_string()))?;
11
12	// Consume authentication response
13	if let Ok(msg) = receiver.recv_timeout(Duration::from_millis(100)) {
14		if let Ok(HttpChannelResponse::Auth {
15			request_id,
16		}) = msg.response
17		{
18			println!("Authenticated with ID: {}", request_id);
19		}
20	}
21
22	// Send multiple requests asynchronously to demonstrate multiplexing
23	println!("Sending multiple requests concurrently...");
24
25	// Send a command to create a table
26	let command_id = session.command("MAP{1}", None)?;
27	println!("Command sent with ID: {}", command_id);
28
29	// Send multiple queries concurrently
30	let query_id1 = session.query("MAP { x: 42, y: 'hello' }", None)?;
31	println!("Query 1 sent with ID: {}", query_id1);
32
33	let query_id2 = session.query("MAP { a: 123, b: 'world' }", None)?;
34	println!("Query 2 sent with ID: {}", query_id2);
35
36	let query_id3 = session.query("MAP { count: 999, active: true }", None)?;
37	println!("Query 3 sent with ID: {}", query_id3);
38
39	// Track which requests we're expecting
40	let mut expected_requests = HashMap::new();
41	expected_requests.insert(command_id.clone(), "MAP{1}");
42	expected_requests.insert(query_id1.clone(), "Query 1 (x: 42, y: 'hello')");
43	expected_requests.insert(query_id2.clone(), "Query 2 (a: 123, b: 'world')");
44	expected_requests.insert(query_id3.clone(), "Query 3 (count: 999, active: true)");
45
46	println!("\nWaiting for responses (they may arrive out of order)...");
47
48	// Receive responses asynchronously - they may arrive in any order
49	let mut received = 0;
50	let total_expected = expected_requests.len();
51
52	while received < total_expected {
53		match receiver.recv_timeout(Duration::from_secs(2)) {
54			Ok(msg) => {
55				let request_description =
56					expected_requests.get(&msg.request_id).unwrap_or(&"Unknown request");
57
58				match msg.response {
59					Ok(HttpChannelResponse::Command {
60						request_id,
61						result,
62					}) => {
63						println!(
64							"✓ Received response for {}: Command {} executed ({} frames)",
65							request_description,
66							request_id,
67							result.frames.len()
68						);
69						expected_requests.remove(&request_id);
70						received += 1;
71					}
72					Ok(HttpChannelResponse::Query {
73						request_id,
74						result,
75					}) => {
76						println!(
77							"✓ Received response for {}: Query {} executed ({} frames)",
78							request_description,
79							request_id,
80							result.frames.len()
81						);
82
83						// Print first frame if
84						// available
85						if let Some(frame) = result.frames.first() {
86							println!("{frame}");
87						}
88
89						expected_requests.remove(&request_id);
90						received += 1;
91					}
92					Ok(HttpChannelResponse::Auth {
93						..
94					}) => {
95						// Already handled above
96					}
97					Err(e) => {
98						println!(
99							"✗ Request {} ({}) failed: {}",
100							msg.request_id, request_description, e
101						);
102						expected_requests.remove(&msg.request_id);
103						received += 1;
104					}
105				}
106			}
107			Err(_) => {
108				println!("Timeout waiting for responses");
109				if !expected_requests.is_empty() {
110					println!(
111						"Still waiting for: {:?}",
112						expected_requests.values().collect::<Vec<_>>()
113					);
114				}
115				break;
116			}
117		}
118	}
119
120	if expected_requests.is_empty() {
121		println!("\n🎉 All {} requests completed successfully!", total_expected);
122		println!("This demonstrates async HTTP multiplexing - requests were sent concurrently");
123		println!("and responses were received and processed as they arrived.");
124	} else {
125		println!("\n⚠️  {} requests did not complete within timeout", expected_requests.len());
126	}
127
128	Ok(())
129}
Source

pub fn from_client( client: HttpClient, token: Option<String>, ) -> Result<(Self, Receiver<HttpResponseMessage>), Error>

Create a new channel HTTP session from an existing client

Source

pub fn from_url( url: &str, token: Option<String>, ) -> Result<(Self, Receiver<HttpResponseMessage>), Error>

Create from URL (e.g., “http://localhost:8080”)

Source

pub fn command( &self, rql: &str, params: Option<Params>, ) -> Result<String, Box<dyn Error>>

Send a command (response arrives on channel)

Examples found in repository?
examples/channel_http.rs (line 26)
8fn main() -> Result<(), Box<dyn std::error::Error>> {
9	// Connect to ReifyDB HTTP server
10	let (session, receiver) = HttpChannelSession::new("127.0.0.1", 8090, Some("mysecrettoken".to_string()))?;
11
12	// Consume authentication response
13	if let Ok(msg) = receiver.recv_timeout(Duration::from_millis(100)) {
14		if let Ok(HttpChannelResponse::Auth {
15			request_id,
16		}) = msg.response
17		{
18			println!("Authenticated with ID: {}", request_id);
19		}
20	}
21
22	// Send multiple requests asynchronously to demonstrate multiplexing
23	println!("Sending multiple requests concurrently...");
24
25	// Send a command to create a table
26	let command_id = session.command("MAP{1}", None)?;
27	println!("Command sent with ID: {}", command_id);
28
29	// Send multiple queries concurrently
30	let query_id1 = session.query("MAP { x: 42, y: 'hello' }", None)?;
31	println!("Query 1 sent with ID: {}", query_id1);
32
33	let query_id2 = session.query("MAP { a: 123, b: 'world' }", None)?;
34	println!("Query 2 sent with ID: {}", query_id2);
35
36	let query_id3 = session.query("MAP { count: 999, active: true }", None)?;
37	println!("Query 3 sent with ID: {}", query_id3);
38
39	// Track which requests we're expecting
40	let mut expected_requests = HashMap::new();
41	expected_requests.insert(command_id.clone(), "MAP{1}");
42	expected_requests.insert(query_id1.clone(), "Query 1 (x: 42, y: 'hello')");
43	expected_requests.insert(query_id2.clone(), "Query 2 (a: 123, b: 'world')");
44	expected_requests.insert(query_id3.clone(), "Query 3 (count: 999, active: true)");
45
46	println!("\nWaiting for responses (they may arrive out of order)...");
47
48	// Receive responses asynchronously - they may arrive in any order
49	let mut received = 0;
50	let total_expected = expected_requests.len();
51
52	while received < total_expected {
53		match receiver.recv_timeout(Duration::from_secs(2)) {
54			Ok(msg) => {
55				let request_description =
56					expected_requests.get(&msg.request_id).unwrap_or(&"Unknown request");
57
58				match msg.response {
59					Ok(HttpChannelResponse::Command {
60						request_id,
61						result,
62					}) => {
63						println!(
64							"✓ Received response for {}: Command {} executed ({} frames)",
65							request_description,
66							request_id,
67							result.frames.len()
68						);
69						expected_requests.remove(&request_id);
70						received += 1;
71					}
72					Ok(HttpChannelResponse::Query {
73						request_id,
74						result,
75					}) => {
76						println!(
77							"✓ Received response for {}: Query {} executed ({} frames)",
78							request_description,
79							request_id,
80							result.frames.len()
81						);
82
83						// Print first frame if
84						// available
85						if let Some(frame) = result.frames.first() {
86							println!("{frame}");
87						}
88
89						expected_requests.remove(&request_id);
90						received += 1;
91					}
92					Ok(HttpChannelResponse::Auth {
93						..
94					}) => {
95						// Already handled above
96					}
97					Err(e) => {
98						println!(
99							"✗ Request {} ({}) failed: {}",
100							msg.request_id, request_description, e
101						);
102						expected_requests.remove(&msg.request_id);
103						received += 1;
104					}
105				}
106			}
107			Err(_) => {
108				println!("Timeout waiting for responses");
109				if !expected_requests.is_empty() {
110					println!(
111						"Still waiting for: {:?}",
112						expected_requests.values().collect::<Vec<_>>()
113					);
114				}
115				break;
116			}
117		}
118	}
119
120	if expected_requests.is_empty() {
121		println!("\n🎉 All {} requests completed successfully!", total_expected);
122		println!("This demonstrates async HTTP multiplexing - requests were sent concurrently");
123		println!("and responses were received and processed as they arrived.");
124	} else {
125		println!("\n⚠️  {} requests did not complete within timeout", expected_requests.len());
126	}
127
128	Ok(())
129}
Source

pub fn query( &self, rql: &str, params: Option<Params>, ) -> Result<String, Box<dyn Error>>

Send a query (response arrives on channel)

Examples found in repository?
examples/channel_http.rs (line 30)
8fn main() -> Result<(), Box<dyn std::error::Error>> {
9	// Connect to ReifyDB HTTP server
10	let (session, receiver) = HttpChannelSession::new("127.0.0.1", 8090, Some("mysecrettoken".to_string()))?;
11
12	// Consume authentication response
13	if let Ok(msg) = receiver.recv_timeout(Duration::from_millis(100)) {
14		if let Ok(HttpChannelResponse::Auth {
15			request_id,
16		}) = msg.response
17		{
18			println!("Authenticated with ID: {}", request_id);
19		}
20	}
21
22	// Send multiple requests asynchronously to demonstrate multiplexing
23	println!("Sending multiple requests concurrently...");
24
25	// Send a command to create a table
26	let command_id = session.command("MAP{1}", None)?;
27	println!("Command sent with ID: {}", command_id);
28
29	// Send multiple queries concurrently
30	let query_id1 = session.query("MAP { x: 42, y: 'hello' }", None)?;
31	println!("Query 1 sent with ID: {}", query_id1);
32
33	let query_id2 = session.query("MAP { a: 123, b: 'world' }", None)?;
34	println!("Query 2 sent with ID: {}", query_id2);
35
36	let query_id3 = session.query("MAP { count: 999, active: true }", None)?;
37	println!("Query 3 sent with ID: {}", query_id3);
38
39	// Track which requests we're expecting
40	let mut expected_requests = HashMap::new();
41	expected_requests.insert(command_id.clone(), "MAP{1}");
42	expected_requests.insert(query_id1.clone(), "Query 1 (x: 42, y: 'hello')");
43	expected_requests.insert(query_id2.clone(), "Query 2 (a: 123, b: 'world')");
44	expected_requests.insert(query_id3.clone(), "Query 3 (count: 999, active: true)");
45
46	println!("\nWaiting for responses (they may arrive out of order)...");
47
48	// Receive responses asynchronously - they may arrive in any order
49	let mut received = 0;
50	let total_expected = expected_requests.len();
51
52	while received < total_expected {
53		match receiver.recv_timeout(Duration::from_secs(2)) {
54			Ok(msg) => {
55				let request_description =
56					expected_requests.get(&msg.request_id).unwrap_or(&"Unknown request");
57
58				match msg.response {
59					Ok(HttpChannelResponse::Command {
60						request_id,
61						result,
62					}) => {
63						println!(
64							"✓ Received response for {}: Command {} executed ({} frames)",
65							request_description,
66							request_id,
67							result.frames.len()
68						);
69						expected_requests.remove(&request_id);
70						received += 1;
71					}
72					Ok(HttpChannelResponse::Query {
73						request_id,
74						result,
75					}) => {
76						println!(
77							"✓ Received response for {}: Query {} executed ({} frames)",
78							request_description,
79							request_id,
80							result.frames.len()
81						);
82
83						// Print first frame if
84						// available
85						if let Some(frame) = result.frames.first() {
86							println!("{frame}");
87						}
88
89						expected_requests.remove(&request_id);
90						received += 1;
91					}
92					Ok(HttpChannelResponse::Auth {
93						..
94					}) => {
95						// Already handled above
96					}
97					Err(e) => {
98						println!(
99							"✗ Request {} ({}) failed: {}",
100							msg.request_id, request_description, e
101						);
102						expected_requests.remove(&msg.request_id);
103						received += 1;
104					}
105				}
106			}
107			Err(_) => {
108				println!("Timeout waiting for responses");
109				if !expected_requests.is_empty() {
110					println!(
111						"Still waiting for: {:?}",
112						expected_requests.values().collect::<Vec<_>>()
113					);
114				}
115				break;
116			}
117		}
118	}
119
120	if expected_requests.is_empty() {
121		println!("\n🎉 All {} requests completed successfully!", total_expected);
122		println!("This demonstrates async HTTP multiplexing - requests were sent concurrently");
123		println!("and responses were received and processed as they arrived.");
124	} else {
125		println!("\n⚠️  {} requests did not complete within timeout", expected_requests.len());
126	}
127
128	Ok(())
129}

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.