reifydb_client/ws/session/
channel.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the MIT
3
4use std::sync::{Arc, mpsc};
5
6use super::ResponseMessage;
7use crate::{
8	AuthRequest, CommandRequest, Params, QueryRequest, Request, RequestPayload,
9	utils::generate_request_id,
10	ws::{
11		client::ClientInner,
12		message::{InternalMessage, ResponseRoute},
13	},
14};
15
16/// A channel-based session for message-passing style communication
17pub struct ChannelSession {
18	client: Arc<ClientInner>,
19	token: Option<String>,
20	response_tx: mpsc::Sender<ResponseMessage>,
21}
22
23impl ChannelSession {
24	/// Create a new channel session
25	pub(crate) fn new(
26		client: Arc<ClientInner>,
27		token: Option<String>,
28	) -> Result<(Self, mpsc::Receiver<ResponseMessage>), reifydb_type::Error> {
29		let (tx, rx) = mpsc::channel();
30
31		let session = Self {
32			client: client.clone(),
33			token: token.clone(),
34			response_tx: tx,
35		};
36
37		// Authenticate if token provided
38		if token.is_some() {
39			let _ = session.authenticate();
40		}
41
42		Ok((session, rx))
43	}
44
45	/// Authenticate (response arrives on channel)
46	fn authenticate(&self) -> Result<String, reifydb_type::Error> {
47		if self.token.is_none() {
48			return Ok(String::new());
49		}
50
51		let id = generate_request_id();
52
53		let request = Request {
54			id: id.clone(),
55			payload: RequestPayload::Auth(AuthRequest {
56				token: self.token.clone(),
57			}),
58		};
59
60		if let Err(e) = self.client.command_tx.send(InternalMessage::Request {
61			id: id.clone(),
62			request,
63			route: ResponseRoute::Channel(self.response_tx.clone()),
64		}) {
65			panic!("Failed to send request: {}", e);
66		}
67
68		Ok(id)
69	}
70
71	/// Send a command (response arrives on channel)
72	pub fn command(&self, rql: &str, params: Option<Params>) -> Result<String, Box<dyn std::error::Error>> {
73		let id = generate_request_id();
74
75		let request = Request {
76			id: id.clone(),
77			payload: RequestPayload::Command(CommandRequest {
78				statements: vec![rql.to_string()],
79				params,
80			}),
81		};
82
83		if let Err(e) = self.client.command_tx.send(InternalMessage::Request {
84			id: id.clone(),
85			request,
86			route: ResponseRoute::Channel(self.response_tx.clone()),
87		}) {
88			panic!("Failed to send request: {}", e);
89		}
90
91		Ok(id)
92	}
93
94	/// Send a query (response arrives on channel)
95	pub fn query(&self, rql: &str, params: Option<Params>) -> Result<String, Box<dyn std::error::Error>> {
96		let id = generate_request_id();
97
98		let request = Request {
99			id: id.clone(),
100			payload: RequestPayload::Query(QueryRequest {
101				statements: vec![rql.to_string()],
102				params,
103			}),
104		};
105
106		if let Err(e) = self.client.command_tx.send(InternalMessage::Request {
107			id: id.clone(),
108			request,
109			route: ResponseRoute::Channel(self.response_tx.clone()),
110		}) {
111			panic!("Failed to send request: {}", e);
112		}
113
114		Ok(id)
115	}
116}