1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
use crate::{ client_channel::FC_Sender, client_half_requests::{ ConsumerFlowHalfRequest, ConsumerRedeliverUnacknowledgedMessagesHalfRequest, HalfRequest, }, client_handler::PendingRequestValue, client_responds::{ConsumerAckRespond, Respond}, command::Command, commands::MessageCommand, types::{ConsumerId, RequestId, RequestIdBuilder}, }; use super::handler_reply_consumer_channel_message::{ HandlerReplyConsumerAckChannelMessage, HandlerReplyConsumerFlowChannelMessage, HandlerReplyConsumerGetMessageChannelMessage, HandlerReplyConsumerRedeliverUnacknowledgedMessagesChannelMessage, }; pub enum ConsumerSendHandlerChannelMessage { Flow( <ConsumerFlowHalfRequest as HalfRequest>::Request, FC_Sender<HandlerReplyConsumerFlowChannelMessage>, ), GetMessage(FC_Sender<HandlerReplyConsumerGetMessageChannelMessage>), Ack( <ConsumerAckRespond as Respond>::Request, FC_Sender<HandlerReplyConsumerAckChannelMessage>, ), RedeliverUnacknowledgedMessages( <ConsumerRedeliverUnacknowledgedMessagesHalfRequest as HalfRequest>::Request, FC_Sender<HandlerReplyConsumerRedeliverUnacknowledgedMessagesChannelMessage>, ), } impl ConsumerSendHandlerChannelMessage { pub fn into_group( self, consumer_id: ConsumerId, request_id_builder: &RequestIdBuilder, ) -> ConsumerSendHandlerChannelMessageGroup { match self { Self::Flow(mut c, s) => { c.set_consumer_id(consumer_id); let command = Command::from(&c); ConsumerSendHandlerChannelMessageGroup::Flow(command, s) } Self::GetMessage(s) => ConsumerSendHandlerChannelMessageGroup::GetMessage(s), Self::Ack(mut c, s) => { c.set_consumer_id(consumer_id); let request_id = request_id_builder.next(); c.set_request_id(request_id.to_owned()); let command = Command::from(&c); ConsumerSendHandlerChannelMessageGroup::PendingRequest( request_id, PendingRequestValue::ConsumerAck(s), Box::new(command), ) } Self::RedeliverUnacknowledgedMessages(mut c, s) => { c.set_consumer_id(consumer_id); let command = Command::from(&c); ConsumerSendHandlerChannelMessageGroup::RedeliverUnacknowledgedMessages(command, s) } } } } pub enum ConsumerSendHandlerChannelMessageGroup { Flow( Command, FC_Sender<Result<(), <ConsumerFlowHalfRequest as HalfRequest>::Error>>, ), GetMessage(FC_Sender<Option<MessageCommand>>), PendingRequest(RequestId, PendingRequestValue, Box<Command>), RedeliverUnacknowledgedMessages( Command, FC_Sender< Result<(), <ConsumerRedeliverUnacknowledgedMessagesHalfRequest as HalfRequest>::Error>, >, ), }