querent_synapse/comm/
channel.rs1use crate::comm::types::message::{MessageState, MessageType};
2use pyo3::prelude::*;
3
4use super::IngestedTokens;
5
6pub trait ChannelInterface {
8 fn receive_tokens_in_python(&mut self) -> Option<IngestedTokens>;
10 fn send_tokens_in_rust(&mut self, tokens: IngestedTokens);
12 fn receive_in_python(&mut self) -> Option<MessageState>;
14 fn send_in_rust(&mut self, message_type: MessageType, message_data: MessageState);
16}
17
18#[derive(Clone, Debug)]
20#[pyclass]
21pub struct ChannelHandler {
22 pub token_receiver: Option<crossbeam_channel::Receiver<IngestedTokens>>,
23 pub token_sender: Option<crossbeam_channel::Sender<IngestedTokens>>,
24 pub py_message_receiver: Option<crossbeam_channel::Receiver<(MessageType, MessageState)>>,
25 pub message_sender: Option<crossbeam_channel::Sender<(MessageType, MessageState)>>,
26}
27
28impl ChannelHandler {
29 pub fn new(
31 token_sender: Option<crossbeam_channel::Sender<IngestedTokens>>,
32 token_receiver: Option<crossbeam_channel::Receiver<IngestedTokens>>,
33 py_message_receiver: Option<crossbeam_channel::Receiver<(MessageType, MessageState)>>,
34 message_sender: Option<crossbeam_channel::Sender<(MessageType, MessageState)>>,
35 ) -> Self {
36 ChannelHandler { py_message_receiver, token_sender, token_receiver, message_sender }
37 }
38}
39
40#[derive(Clone, Debug)]
42#[pyclass]
43pub struct PyMessageInterface {
44 channel_handler: ChannelHandler,
46}
47
48#[pymethods]
50impl PyMessageInterface {
51 #[new]
53 pub fn new(channel: ChannelHandler) -> Self {
54 PyMessageInterface { channel_handler: channel }
55 }
56
57 pub fn receive_tokens_in_python(&mut self) -> Option<IngestedTokens> {
59 self.channel_handler.receive_tokens_in_python()
61 }
62
63 pub fn send_tokens_in_rust(&mut self, tokens: IngestedTokens) {
65 self.channel_handler.send_tokens_in_rust(tokens)
67 }
68
69 pub fn receive_in_python(&mut self) -> Option<MessageState> {
71 self.channel_handler.receive_in_python()
73 }
74 pub fn send_in_rust(&mut self, message_type: MessageType, message_data: MessageState) {
76 self.channel_handler.send_in_rust(message_type, message_data)
78 }
79}
80
81impl ChannelInterface for ChannelHandler {
83 fn receive_tokens_in_python(&mut self) -> Option<IngestedTokens> {
84 if self.token_receiver.is_some() {
85 let token = self.token_receiver.as_ref();
86 match token {
87 Some(token) => {
88 let token = token.try_recv();
89 match token {
90 Ok(token) => Some(token),
91 Err(_) => None,
92 }
93 },
94 None => None,
95 }
96 } else {
97 None
98 }
99 }
100
101 fn send_tokens_in_rust(&mut self, tokens: IngestedTokens) {
102 if self.token_sender.is_some() {
103 let token = self.token_sender.as_ref();
104 match token {
105 Some(token) => {
106 let token = token.send(tokens);
107 match token {
108 Ok(_) => (),
109 Err(_) => (),
110 }
111 },
112 None => (),
113 }
114 } else {
115 ()
116 }
117 }
118
119 fn receive_in_python(&mut self) -> Option<MessageState> {
121 if self.py_message_receiver.is_some() {
122 let message = self.py_message_receiver.as_ref();
123 match message {
124 Some(message) => {
125 let message = message.try_recv();
126 match message {
127 Ok(message) => Some(message.1),
128 Err(_) => None,
129 }
130 },
131 None => None,
132 }
133 } else {
134 None
135 }
136 }
137
138 fn send_in_rust(&mut self, message_type: MessageType, message_data: MessageState) {
140 if self.message_sender.is_some() {
141 let message = self.message_sender.as_ref();
142 match message {
143 Some(message) => {
144 let message = message.send((message_type, message_data));
145 match message {
146 Ok(_) => (),
147 Err(_) => (),
148 }
149 },
150 None => (),
151 }
152 } else {
153 ()
154 }
155 }
156}