feagi_io/protocol_implementations/zmq/
client_implementations.rs1use std::env;
13use zmq::{Context, Message, Socket};
14
15use crate::protocol_implementations::zmq::shared::ZmqUrl;
16use crate::traits_and_enums::client::{
17 FeagiClient, FeagiClientPusher, FeagiClientPusherProperties, FeagiClientRequester,
18 FeagiClientRequesterProperties, FeagiClientSubscriber, FeagiClientSubscriberProperties,
19};
20use crate::traits_and_enums::shared::{FeagiEndpointState, TransportProtocolEndpoint};
21use crate::FeagiNetworkError;
22
23fn parse_bool_env(name: &str) -> Result<Option<bool>, FeagiNetworkError> {
24 let Ok(raw) = env::var(name) else {
25 return Ok(None);
26 };
27 let normalized = raw.trim().to_ascii_lowercase();
28 let value = match normalized.as_str() {
29 "1" | "true" | "yes" | "on" => true,
30 "0" | "false" | "no" | "off" => false,
31 _ => {
32 return Err(FeagiNetworkError::InvalidSocketProperties(format!(
33 "Invalid boolean value for {name}: '{raw}'"
34 )));
35 }
36 };
37 Ok(Some(value))
38}
39
40fn parse_i32_env(name: &str) -> Result<Option<i32>, FeagiNetworkError> {
41 let Ok(raw) = env::var(name) else {
42 return Ok(None);
43 };
44 let parsed = raw.trim().parse::<i32>().map_err(|_| {
45 FeagiNetworkError::InvalidSocketProperties(format!(
46 "Invalid integer value for {name}: '{raw}'"
47 ))
48 })?;
49 Ok(Some(parsed))
50}
51
52fn apply_common_client_zmq_tuning(socket: &Socket) -> Result<(), FeagiNetworkError> {
53 if let Some(sndhwm) = parse_i32_env("FEAGI_ZMQ_SNDHWM")? {
54 socket
55 .set_sndhwm(sndhwm)
56 .map_err(|e| FeagiNetworkError::InvalidSocketProperties(e.to_string()))?;
57 }
58 if let Some(linger_ms) = parse_i32_env("FEAGI_ZMQ_LINGER_MS")? {
59 socket
60 .set_linger(linger_ms)
61 .map_err(|e| FeagiNetworkError::InvalidSocketProperties(e.to_string()))?;
62 }
63 if let Some(immediate) = parse_bool_env("FEAGI_ZMQ_IMMEDIATE")? {
64 socket
65 .set_immediate(immediate)
66 .map_err(|e| FeagiNetworkError::InvalidSocketProperties(e.to_string()))?;
67 }
68 Ok(())
69}
70
71fn apply_subscriber_zmq_tuning(socket: &Socket) -> Result<(), FeagiNetworkError> {
72 if let Some(rcvhwm) = parse_i32_env("FEAGI_ZMQ_RCVHWM")? {
73 socket
74 .set_rcvhwm(rcvhwm)
75 .map_err(|e| FeagiNetworkError::InvalidSocketProperties(e.to_string()))?;
76 }
77 Ok(())
78}
79
80#[derive(Debug, Clone, PartialEq)]
91pub struct FeagiZmqClientSubscriberProperties {
92 server_address: ZmqUrl,
93}
94
95impl FeagiZmqClientSubscriberProperties {
96 pub fn new(server_address: &str) -> Result<Self, FeagiNetworkError> {
106 let zmq_url = ZmqUrl::new(server_address)?;
107 Ok(Self {
108 server_address: zmq_url,
109 })
110 }
111}
112
113impl FeagiClientSubscriberProperties for FeagiZmqClientSubscriberProperties {
114 fn as_boxed_client_subscriber(&self) -> Box<dyn FeagiClientSubscriber> {
115 let context = Context::new();
116 let socket = context
117 .socket(zmq::SUB)
118 .expect("Failed to create ZMQ SUB socket");
119
120 socket
122 .set_subscribe(b"")
123 .expect("Failed to set ZMQ subscription filter");
124
125 Box::new(FeagiZmqClientSubscriber {
126 server_address: self.server_address.clone(),
127 current_state: FeagiEndpointState::Inactive,
128 context,
129 socket,
130 recv_msg: Message::new(),
131 has_data: false,
132 })
133 }
134
135 fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
136 TransportProtocolEndpoint::Zmq(self.server_address.clone())
137 }
138}
139
140pub struct FeagiZmqClientSubscriber {
167 server_address: ZmqUrl,
168 current_state: FeagiEndpointState,
169 #[allow(dead_code)]
170 context: Context,
171 socket: Socket,
172 recv_msg: Message,
174 has_data: bool,
176}
177
178impl FeagiClient for FeagiZmqClientSubscriber {
179 fn poll(&mut self) -> &FeagiEndpointState {
180 if matches!(self.current_state, FeagiEndpointState::ActiveWaiting) && !self.has_data {
181 match self.socket.recv(&mut self.recv_msg, zmq::DONTWAIT) {
182 Ok(()) => {
183 self.has_data = true;
184 self.current_state = FeagiEndpointState::ActiveHasData;
185 }
186 Err(zmq::Error::EAGAIN) => {
187 }
189 Err(e) => {
190 self.current_state = FeagiEndpointState::Errored(
191 FeagiNetworkError::ReceiveFailed(e.to_string()),
192 );
193 }
194 }
195 }
196 &self.current_state
197 }
198
199 fn request_connect(&mut self) -> Result<(), FeagiNetworkError> {
200 match &self.current_state {
201 FeagiEndpointState::Inactive => {
202 apply_common_client_zmq_tuning(&self.socket)?;
203 apply_subscriber_zmq_tuning(&self.socket)?;
204 self.socket
205 .connect(&self.server_address.to_string())
206 .map_err(|e| FeagiNetworkError::CannotConnect(e.to_string()))?;
207
208 self.current_state = FeagiEndpointState::ActiveWaiting;
209 Ok(())
210 }
211 _ => Err(FeagiNetworkError::InvalidSocketProperties(
212 "Cannot connect: client is not in Inactive state".to_string(),
213 )),
214 }
215 }
216
217 fn request_disconnect(&mut self) -> Result<(), FeagiNetworkError> {
218 match &self.current_state {
219 FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
220 self.socket
221 .disconnect(&self.server_address.to_string())
222 .map_err(|e| FeagiNetworkError::CannotDisconnect(e.to_string()))?;
223
224 self.has_data = false;
225 self.current_state = FeagiEndpointState::Inactive;
226 Ok(())
227 }
228 _ => Err(FeagiNetworkError::InvalidSocketProperties(
229 "Cannot disconnect: client is not in Active state".to_string(),
230 )),
231 }
232 }
233
234 fn confirm_error_and_close(&mut self) -> Result<(), FeagiNetworkError> {
235 match &self.current_state {
236 FeagiEndpointState::Errored(_) => {
237 let _ = self.socket.disconnect(&self.server_address.to_string());
238 self.has_data = false;
239 self.current_state = FeagiEndpointState::Inactive;
240 Ok(())
241 }
242 _ => Err(FeagiNetworkError::InvalidSocketProperties(
243 "Cannot confirm error: client is not in Errored state".to_string(),
244 )),
245 }
246 }
247
248 fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
249 TransportProtocolEndpoint::Zmq(self.server_address.clone())
250 }
251}
252
253impl FeagiClientSubscriber for FeagiZmqClientSubscriber {
254 fn consume_retrieved_data(&mut self) -> Result<&[u8], FeagiNetworkError> {
255 match &self.current_state {
256 FeagiEndpointState::ActiveHasData => {
257 if self.has_data {
258 self.has_data = false;
259 self.current_state = FeagiEndpointState::ActiveWaiting;
260 Ok(&self.recv_msg)
261 } else {
262 Err(FeagiNetworkError::ReceiveFailed(
263 "No data available despite ActiveHasData state".to_string(),
264 ))
265 }
266 }
267 _ => Err(FeagiNetworkError::ReceiveFailed(
268 "Cannot consume: no data available".to_string(),
269 )),
270 }
271 }
272
273 fn as_boxed_subscriber_properties(&self) -> Box<dyn FeagiClientSubscriberProperties> {
274 Box::new(FeagiZmqClientSubscriberProperties {
275 server_address: self.server_address.clone(),
276 })
277 }
278}
279
280#[derive(Debug, Clone, PartialEq)]
293pub struct FeagiZmqClientPusherProperties {
294 server_address: ZmqUrl,
295}
296
297impl FeagiZmqClientPusherProperties {
298 pub fn new(server_address: &str) -> Result<Self, FeagiNetworkError> {
308 let zmq_url = ZmqUrl::new(server_address)?;
309 Ok(Self {
310 server_address: zmq_url,
311 })
312 }
313}
314
315impl FeagiClientPusherProperties for FeagiZmqClientPusherProperties {
316 fn as_boxed_client_pusher(&self) -> Box<dyn FeagiClientPusher> {
317 let context = Context::new();
318 let socket = context
319 .socket(zmq::PUSH)
320 .expect("Failed to create ZMQ PUSH socket");
321
322 Box::new(FeagiZmqClientPusher {
323 server_address: self.server_address.clone(),
324 current_state: FeagiEndpointState::Inactive,
325 context,
326 socket,
327 })
328 }
329
330 fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
331 TransportProtocolEndpoint::Zmq(self.server_address.clone())
332 }
333}
334
335pub struct FeagiZmqClientPusher {
355 server_address: ZmqUrl,
356 current_state: FeagiEndpointState,
357 #[allow(dead_code)]
358 context: Context,
359 socket: Socket,
360}
361
362impl FeagiClient for FeagiZmqClientPusher {
363 fn poll(&mut self) -> &FeagiEndpointState {
364 &self.current_state
366 }
367
368 fn request_connect(&mut self) -> Result<(), FeagiNetworkError> {
369 match &self.current_state {
370 FeagiEndpointState::Inactive => {
371 apply_common_client_zmq_tuning(&self.socket)?;
372 self.socket
373 .connect(&self.server_address.to_string())
374 .map_err(|e| FeagiNetworkError::CannotConnect(e.to_string()))?;
375
376 self.current_state = FeagiEndpointState::ActiveWaiting;
377 Ok(())
378 }
379 _ => Err(FeagiNetworkError::InvalidSocketProperties(
380 "Cannot connect: client is not in Inactive state".to_string(),
381 )),
382 }
383 }
384
385 fn request_disconnect(&mut self) -> Result<(), FeagiNetworkError> {
386 match &self.current_state {
387 FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
388 self.socket
389 .disconnect(&self.server_address.to_string())
390 .map_err(|e| FeagiNetworkError::CannotDisconnect(e.to_string()))?;
391
392 self.current_state = FeagiEndpointState::Inactive;
393 Ok(())
394 }
395 _ => Err(FeagiNetworkError::InvalidSocketProperties(
396 "Cannot disconnect: client is not in Active state".to_string(),
397 )),
398 }
399 }
400
401 fn confirm_error_and_close(&mut self) -> Result<(), FeagiNetworkError> {
402 match &self.current_state {
403 FeagiEndpointState::Errored(_) => {
404 let _ = self.socket.disconnect(&self.server_address.to_string());
405 self.current_state = FeagiEndpointState::Inactive;
406 Ok(())
407 }
408 _ => Err(FeagiNetworkError::InvalidSocketProperties(
409 "Cannot confirm error: client is not in Errored state".to_string(),
410 )),
411 }
412 }
413
414 fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
415 TransportProtocolEndpoint::Zmq(self.server_address.clone())
416 }
417}
418
419impl FeagiClientPusher for FeagiZmqClientPusher {
420 fn publish_data(&mut self, data: &[u8]) -> Result<(), FeagiNetworkError> {
421 match &self.current_state {
422 FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
423 self.socket.send(data, zmq::DONTWAIT).map_err(|e| {
424 if e == zmq::Error::EAGAIN {
425 FeagiNetworkError::SendFailed("Socket would block".to_string())
426 } else {
427 FeagiNetworkError::SendFailed(e.to_string())
428 }
429 })?;
430 Ok(())
431 }
432 _ => Err(FeagiNetworkError::SendFailed(
433 "Cannot publish: client is not in Active state".to_string(),
434 )),
435 }
436 }
437
438 fn as_boxed_pusher_properties(&self) -> Box<dyn FeagiClientPusherProperties> {
439 Box::new(FeagiZmqClientPusherProperties {
440 server_address: self.server_address.clone(),
441 })
442 }
443}
444
445#[derive(Debug, Clone, PartialEq)]
458pub struct FeagiZmqClientRequesterProperties {
459 server_address: ZmqUrl,
460}
461
462impl FeagiZmqClientRequesterProperties {
463 pub fn new(server_address: &str) -> Result<Self, FeagiNetworkError> {
473 let zmq_url = ZmqUrl::new(server_address)?;
474 Ok(Self {
475 server_address: zmq_url,
476 })
477 }
478}
479
480impl FeagiClientRequesterProperties for FeagiZmqClientRequesterProperties {
481 fn as_boxed_client_requester(&self) -> Box<dyn FeagiClientRequester> {
482 let context = Context::new();
483 let socket = context
484 .socket(zmq::DEALER)
485 .expect("Failed to create ZMQ DEALER socket");
486
487 Box::new(FeagiZmqClientRequester {
488 server_address: self.server_address.clone(),
489 current_state: FeagiEndpointState::Inactive,
490 context,
491 socket,
492 delimiter_msg: Message::new(),
493 payload_msg: Message::new(),
494 has_data: false,
495 })
496 }
497
498 fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
499 TransportProtocolEndpoint::Zmq(self.server_address.clone())
500 }
501}
502
503pub struct FeagiZmqClientRequester {
537 server_address: ZmqUrl,
538 current_state: FeagiEndpointState,
539 #[allow(dead_code)]
540 context: Context,
541 socket: Socket,
542 delimiter_msg: Message,
544 payload_msg: Message,
546 has_data: bool,
548}
549
550impl FeagiZmqClientRequester {
551 fn try_recv_response(&mut self) -> Result<bool, zmq::Error> {
555 self.socket.recv(&mut self.delimiter_msg, zmq::DONTWAIT)?;
557
558 if self.socket.get_rcvmore()? {
560 self.socket.recv(&mut self.payload_msg, 0)?;
562
563 while self.socket.get_rcvmore()? {
565 let mut discard = Message::new();
566 self.socket.recv(&mut discard, 0)?;
567 }
568 } else {
569 std::mem::swap(&mut self.delimiter_msg, &mut self.payload_msg);
571 }
572
573 Ok(true)
574 }
575}
576
577impl FeagiClient for FeagiZmqClientRequester {
578 fn poll(&mut self) -> &FeagiEndpointState {
579 if matches!(self.current_state, FeagiEndpointState::ActiveWaiting) && !self.has_data {
580 match self.try_recv_response() {
581 Ok(true) => {
582 self.has_data = true;
583 self.current_state = FeagiEndpointState::ActiveHasData;
584 }
585 Ok(false) => {
586 }
588 Err(zmq::Error::EAGAIN) => {
589 }
591 Err(e) => {
592 self.current_state = FeagiEndpointState::Errored(
593 FeagiNetworkError::ReceiveFailed(e.to_string()),
594 );
595 }
596 }
597 }
598 &self.current_state
599 }
600
601 fn request_connect(&mut self) -> Result<(), FeagiNetworkError> {
602 match &self.current_state {
603 FeagiEndpointState::Inactive => {
604 apply_common_client_zmq_tuning(&self.socket)?;
605 self.socket
606 .connect(&self.server_address.to_string())
607 .map_err(|e| FeagiNetworkError::CannotConnect(e.to_string()))?;
608
609 self.current_state = FeagiEndpointState::ActiveWaiting;
610 Ok(())
611 }
612 _ => Err(FeagiNetworkError::InvalidSocketProperties(
613 "Cannot connect: client is not in Inactive state".to_string(),
614 )),
615 }
616 }
617
618 fn request_disconnect(&mut self) -> Result<(), FeagiNetworkError> {
619 match &self.current_state {
620 FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
621 self.socket
622 .disconnect(&self.server_address.to_string())
623 .map_err(|e| FeagiNetworkError::CannotDisconnect(e.to_string()))?;
624
625 self.has_data = false;
626 self.current_state = FeagiEndpointState::Inactive;
627 Ok(())
628 }
629 _ => Err(FeagiNetworkError::InvalidSocketProperties(
630 "Cannot disconnect: client is not in Active state".to_string(),
631 )),
632 }
633 }
634
635 fn confirm_error_and_close(&mut self) -> Result<(), FeagiNetworkError> {
636 match &self.current_state {
637 FeagiEndpointState::Errored(_) => {
638 let _ = self.socket.disconnect(&self.server_address.to_string());
639 self.has_data = false;
640 self.current_state = FeagiEndpointState::Inactive;
641 Ok(())
642 }
643 _ => Err(FeagiNetworkError::InvalidSocketProperties(
644 "Cannot confirm error: client is not in Errored state".to_string(),
645 )),
646 }
647 }
648
649 fn get_endpoint_target(&self) -> TransportProtocolEndpoint {
650 TransportProtocolEndpoint::Zmq(self.server_address.clone())
651 }
652}
653
654impl FeagiClientRequester for FeagiZmqClientRequester {
655 fn publish_request(&mut self, request: &[u8]) -> Result<(), FeagiNetworkError> {
656 match &self.current_state {
657 FeagiEndpointState::ActiveWaiting | FeagiEndpointState::ActiveHasData => {
658 let frames: &[&[u8]] = &[&[], request];
660 self.socket
661 .send_multipart(frames, zmq::DONTWAIT)
662 .map_err(|e| {
663 if e == zmq::Error::EAGAIN {
664 FeagiNetworkError::SendFailed("Socket would block".to_string())
665 } else {
666 FeagiNetworkError::SendFailed(e.to_string())
667 }
668 })?;
669 Ok(())
670 }
671 _ => Err(FeagiNetworkError::SendFailed(
672 "Cannot send request: client is not in Active state".to_string(),
673 )),
674 }
675 }
676
677 fn consume_retrieved_response(&mut self) -> Result<&[u8], FeagiNetworkError> {
678 match &self.current_state {
679 FeagiEndpointState::ActiveHasData => {
680 if self.has_data {
681 self.has_data = false;
682 self.current_state = FeagiEndpointState::ActiveWaiting;
683 Ok(&self.payload_msg)
684 } else {
685 Err(FeagiNetworkError::ReceiveFailed(
686 "No data available despite ActiveHasData state".to_string(),
687 ))
688 }
689 }
690 _ => Err(FeagiNetworkError::ReceiveFailed(
691 "Cannot consume: no response available".to_string(),
692 )),
693 }
694 }
695
696 fn as_boxed_requester_properties(&self) -> Box<dyn FeagiClientRequesterProperties> {
697 Box::new(FeagiZmqClientRequesterProperties {
698 server_address: self.server_address.clone(),
699 })
700 }
701}
702
703