rocketmq_remoting/runtime/
connection_handler_context.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use rocketmq_rust::ArcMut;
19use tracing::error;
20
21use crate::connection::Connection;
22use crate::net::channel::Channel;
23use crate::protocol::remoting_command::RemotingCommand;
24
25pub type ConnectionHandlerContext = ArcMut<ConnectionHandlerContextWrapper>;
26
27#[derive(Hash, Eq, PartialEq)]
28pub struct ConnectionHandlerContextWrapper {
29    // pub(crate) connection: Connection,
30    pub(crate) channel: Channel,
31}
32
33impl ConnectionHandlerContextWrapper {
34    // pub fn new(connection: Connection, channel: Channel) -> Self {
35    pub fn new(channel: Channel) -> Self {
36        Self {
37            //connection,
38            channel,
39        }
40    }
41
42    pub fn connection(&self) -> Option<&Connection> {
43        /*match self.channel.upgrade() {
44            None => None,
45            Some(channel) => Some(channel.connection_ref()),
46        }*/
47        unimplemented!("connection() is not implemented");
48    }
49
50    pub async fn write(&mut self, cmd: RemotingCommand) {
51        match self.channel.upgrade() {
52            Some(mut channel) => match channel.connection_mut().send_command(cmd).await {
53                Ok(_) => {}
54                Err(error) => {
55                    error!("send response failed: {}", error);
56                }
57            },
58            None => {
59                error!("channel is closed");
60            }
61        }
62    }
63    pub async fn write_ref(&mut self, cmd: &mut RemotingCommand) {
64        match self.channel.upgrade() {
65            Some(mut channel) => match channel.connection_mut().send_command_ref(cmd).await {
66                Ok(_) => {}
67                Err(error) => {
68                    error!("send response failed: {}", error);
69                }
70            },
71            None => {
72                error!("channel is closed");
73            }
74        }
75    }
76
77    pub fn channel(&self) -> &Channel {
78        &self.channel
79    }
80
81    pub fn channel_mut(&mut self) -> &mut Channel {
82        &mut self.channel
83    }
84}
85
86impl AsRef<ConnectionHandlerContextWrapper> for ConnectionHandlerContextWrapper {
87    fn as_ref(&self) -> &ConnectionHandlerContextWrapper {
88        self
89    }
90}
91
92impl AsMut<ConnectionHandlerContextWrapper> for ConnectionHandlerContextWrapper {
93    fn as_mut(&mut self) -> &mut ConnectionHandlerContextWrapper {
94        self
95    }
96}