reifydb_client/http/session/
channel.rs1use std::{sync::mpsc, time::Instant};
5
6use reifydb_type::{Error, diagnostic::internal};
7
8use crate::{
9 CommandRequest, Params, QueryRequest,
10 http::{
11 client::HttpClient,
12 message::{HttpInternalMessage, HttpResponseRoute},
13 },
14 utils::generate_request_id,
15};
16
17#[derive(Debug)]
19pub enum HttpChannelResponse {
20 Auth {
22 request_id: String,
23 },
24 Command {
26 request_id: String,
27 result: crate::session::CommandResult,
28 },
29 Query {
31 request_id: String,
32 result: crate::session::QueryResult,
33 },
34}
35
36#[derive(Debug)]
38pub struct HttpResponseMessage {
39 pub request_id: String,
40 pub response: Result<HttpChannelResponse, Error>,
41 pub timestamp: Instant,
42}
43
44pub struct HttpChannelSession {
46 client: HttpClient,
47 token: Option<String>,
48 response_tx: mpsc::Sender<HttpResponseMessage>,
49}
50
51impl HttpChannelSession {
52 pub fn new(
54 host: &str,
55 port: u16,
56 token: Option<String>,
57 ) -> Result<(Self, mpsc::Receiver<HttpResponseMessage>), Error> {
58 let client = HttpClient::new((host, port))
59 .map_err(|e| Error(internal(format!("Failed to create client: {}", e))))?;
60 Self::from_client(client, token)
61 }
62
63 pub fn from_client(
65 client: HttpClient,
66 token: Option<String>,
67 ) -> Result<(Self, mpsc::Receiver<HttpResponseMessage>), Error> {
68 let (tx, rx) = mpsc::channel();
69
70 let session = Self {
71 client,
72 token: token.clone(),
73 response_tx: tx,
74 };
75
76 if token.is_some() {
78 let _ = session.authenticate();
79 }
80
81 Ok((session, rx))
82 }
83
84 pub fn from_url(
86 url: &str,
87 token: Option<String>,
88 ) -> Result<(Self, mpsc::Receiver<HttpResponseMessage>), Error> {
89 let client = HttpClient::from_url(url).map_err(|e| Error(internal(format!("Invalid URL: {}", e))))?;
90 Self::from_client(client, token)
91 }
92
93 fn authenticate(&self) -> Result<String, Error> {
95 if self.token.is_none() {
96 return Ok(String::new());
97 }
98
99 let id = generate_request_id();
100
101 if let Err(e) = self.client.command_tx().send(HttpInternalMessage::Auth {
103 id: id.clone(),
104 _token: self.token.clone(),
105 route: HttpResponseRoute::Channel(self.response_tx.clone()),
106 }) {
107 return Err(Error(internal(format!("Failed to send auth request: {}", e))));
108 }
109
110 Ok(id)
111 }
112
113 pub fn command(&self, rql: &str, params: Option<Params>) -> Result<String, Box<dyn std::error::Error>> {
115 let id = generate_request_id();
116
117 let request = CommandRequest {
118 statements: vec![rql.to_string()],
119 params,
120 };
121
122 if let Err(e) = self.client.command_tx().send(HttpInternalMessage::Command {
124 id: id.clone(),
125 request,
126 route: HttpResponseRoute::Channel(self.response_tx.clone()),
127 }) {
128 return Err(format!("Failed to send command request: {}", e).into());
129 }
130
131 Ok(id)
132 }
133
134 pub fn query(&self, rql: &str, params: Option<Params>) -> Result<String, Box<dyn std::error::Error>> {
136 let id = generate_request_id();
137
138 let request = QueryRequest {
139 statements: vec![rql.to_string()],
140 params,
141 };
142
143 if let Err(e) = self.client.command_tx().send(HttpInternalMessage::Query {
145 id: id.clone(),
146 request,
147 route: HttpResponseRoute::Channel(self.response_tx.clone()),
148 }) {
149 return Err(format!("Failed to send query request: {}", e).into());
150 }
151
152 Ok(id)
153 }
154}