rocketmq_remoting/runtime/
connection_handler_context.rs1use rocketmq_rust::ArcMut;
19use tracing::error;
20
21use crate::connection::Connection;
22use crate::net::channel::Channel;
23use crate::protocol::remoting_command::RemotingCommand;
24
25pub type ConnectionHandlerContext = ArcMut<ConnectionHandlerContextWrapper>;
26
27#[derive(Hash, Eq, PartialEq)]
28pub struct ConnectionHandlerContextWrapper {
29 pub(crate) channel: Channel,
31}
32
33impl ConnectionHandlerContextWrapper {
34 pub fn new(channel: Channel) -> Self {
36 Self {
37 channel,
39 }
40 }
41
42 pub fn connection(&self) -> Option<&Connection> {
43 unimplemented!("connection() is not implemented");
48 }
49
50 pub async fn write(&mut self, cmd: RemotingCommand) {
51 match self.channel.upgrade() {
52 Some(mut channel) => match channel.connection_mut().send_command(cmd).await {
53 Ok(_) => {}
54 Err(error) => {
55 error!("send response failed: {}", error);
56 }
57 },
58 None => {
59 error!("channel is closed");
60 }
61 }
62 }
63 pub async fn write_ref(&mut self, cmd: &mut RemotingCommand) {
64 match self.channel.upgrade() {
65 Some(mut channel) => match channel.connection_mut().send_command_ref(cmd).await {
66 Ok(_) => {}
67 Err(error) => {
68 error!("send response failed: {}", error);
69 }
70 },
71 None => {
72 error!("channel is closed");
73 }
74 }
75 }
76
77 pub fn channel(&self) -> &Channel {
78 &self.channel
79 }
80
81 pub fn channel_mut(&mut self) -> &mut Channel {
82 &mut self.channel
83 }
84}
85
86impl AsRef<ConnectionHandlerContextWrapper> for ConnectionHandlerContextWrapper {
87 fn as_ref(&self) -> &ConnectionHandlerContextWrapper {
88 self
89 }
90}
91
92impl AsMut<ConnectionHandlerContextWrapper> for ConnectionHandlerContextWrapper {
93 fn as_mut(&mut self) -> &mut ConnectionHandlerContextWrapper {
94 self
95 }
96}