1use std::net::ToSocketAddrs;
2use std::sync::Arc;
3
4use log::info;
5
6use crate::connection::Connection;
7use crate::error::{Error, Result};
8use crate::protocol::{Command, Frame, IdentifyConfig, ProtocolError};
9
10pub async fn lookup_nsqd_nodes<A: ToSocketAddrs + std::fmt::Display>(
12 nsqlookupd_addr: A,
13 topic: &str,
14) -> Result<Vec<String>> {
15 info!(
19 "正在查询 {} 上的nsqlookupd以查找主题 {} 的生产者",
20 nsqlookupd_addr, topic
21 );
22
23 Ok(vec![nsqlookupd_addr.to_string()])
29}
30
31pub async fn create_nsqd_connection<A: ToSocketAddrs + std::fmt::Display>(
33 nsqd_addr: A,
34 identify_config: Option<IdentifyConfig>,
35 auth_secret: Option<String>,
36) -> Result<Arc<Connection>> {
37 let connection = Connection::new(
38 nsqd_addr,
39 identify_config,
40 auth_secret,
41 std::time::Duration::from_secs(60), std::time::Duration::from_secs(1), )
44 .await?;
45
46 Ok(Arc::new(connection))
47}
48
49pub async fn publish_message<A: ToSocketAddrs + std::fmt::Display>(
51 nsqd_addr: A,
52 topic: &str,
53 message: Vec<u8>,
54) -> Result<()> {
55 let connection = create_nsqd_connection(nsqd_addr, None, None).await?;
56
57 info!("向主题 {} 发布消息", topic);
58
59 connection
60 .send_command(Command::Publish(topic.to_string(), message))
61 .await?;
62
63 match connection.read_frame().await? {
65 Frame::Response(_) => Ok(()),
66 Frame::Error(data) => {
67 let error_str = String::from_utf8_lossy(&data);
68 Err(Error::Protocol(ProtocolError::Other(format!(
69 "发布消息时出错: {}",
70 error_str
71 ))))
72 }
73 _ => Err(Error::Protocol(ProtocolError::Other(
74 "发布消息时收到意外响应".to_string(),
75 ))),
76 }
77}
78
79pub async fn mpublish_messages<A: ToSocketAddrs + std::fmt::Display>(
81 nsqd_addr: A,
82 topic: &str,
83 messages: Vec<Vec<u8>>,
84) -> Result<()> {
85 if messages.is_empty() {
86 return Ok(());
87 }
88
89 let connection = create_nsqd_connection(nsqd_addr, None, None).await?;
90
91 info!("向主题 {} 批量发布 {} 条消息", topic, messages.len());
92
93 connection
94 .send_command(Command::Mpublish(topic.to_string(), messages))
95 .await?;
96
97 match connection.read_frame().await? {
99 Frame::Response(_) => Ok(()),
100 Frame::Error(data) => {
101 let error_str = String::from_utf8_lossy(&data);
102 Err(Error::Protocol(ProtocolError::Other(format!(
103 "批量发布消息时出错: {}",
104 error_str
105 ))))
106 }
107 _ => Err(Error::Protocol(ProtocolError::Other(
108 "批量发布消息时收到意外响应".to_string(),
109 ))),
110 }
111}
112
113pub async fn subscribe<A: ToSocketAddrs + std::fmt::Display>(
115 nsqd_addr: A,
116 topic: &str,
117 channel: &str,
118 ready_count: u32,
119) -> Result<Arc<Connection>> {
120 let connection = create_nsqd_connection(nsqd_addr, None, None).await?;
121
122 info!("订阅主题 {}, 频道 {}", topic, channel);
123
124 connection
126 .send_command(Command::Subscribe(topic.to_string(), channel.to_string()))
127 .await?;
128
129 match connection.read_frame().await? {
131 Frame::Response(_) => {
132 info!("成功订阅");
133
134 connection.send_command(Command::Ready(ready_count)).await?;
136
137 Ok(connection)
138 }
139 Frame::Error(data) => {
140 let error_str = String::from_utf8_lossy(&data);
141 Err(Error::Protocol(ProtocolError::Other(format!(
142 "订阅时出错: {}",
143 error_str
144 ))))
145 }
146 _ => Err(Error::Protocol(ProtocolError::Other(
147 "订阅时收到意外响应".to_string(),
148 ))),
149 }
150}