nsq_async_rs/
commands.rs

1use std::net::ToSocketAddrs;
2use std::sync::Arc;
3
4use log::info;
5
6use crate::connection::Connection;
7use crate::error::{Error, Result};
8use crate::protocol::{Command, Frame, IdentifyConfig, ProtocolError};
9
10/// 发现NSQ服务器
11pub async fn lookup_nsqd_nodes<A: ToSocketAddrs + std::fmt::Display>(
12    nsqlookupd_addr: A,
13    topic: &str,
14) -> Result<Vec<String>> {
15    // 这是一个简化版实现,真实场景中应当使用HTTP API查询nsqlookupd
16    // 未来版本可实现对nsqlookupd HTTP API的完整支持
17
18    info!(
19        "正在查询 {} 上的nsqlookupd以查找主题 {} 的生产者",
20        nsqlookupd_addr, topic
21    );
22
23    // 实际应用中,这里应该向nsqlookupd发送HTTP请求
24    // 例如:GET /lookup?topic={topic}
25    // 并解析响应以获取nsqd节点列表
26
27    // 作为简化示例,这里假设我们直接使用提供的nsqlookupd地址作为nsqd节点
28    Ok(vec![nsqlookupd_addr.to_string()])
29}
30
31/// 创建到NSQ服务器的连接
32pub async fn create_nsqd_connection<A: ToSocketAddrs + std::fmt::Display>(
33    nsqd_addr: A,
34    identify_config: Option<IdentifyConfig>,
35    auth_secret: Option<String>,
36) -> Result<Arc<Connection>> {
37    let connection = Connection::new(
38        nsqd_addr,
39        identify_config,
40        auth_secret,
41        std::time::Duration::from_secs(60), // 默认读超时
42        std::time::Duration::from_secs(1),  // 默认写超时
43    )
44    .await?;
45
46    Ok(Arc::new(connection))
47}
48
49/// 向NSQ主题发布消息
50pub async fn publish_message<A: ToSocketAddrs + std::fmt::Display>(
51    nsqd_addr: A,
52    topic: &str,
53    message: Vec<u8>,
54) -> Result<()> {
55    let connection = create_nsqd_connection(nsqd_addr, None, None).await?;
56
57    info!("向主题 {} 发布消息", topic);
58
59    connection
60        .send_command(Command::Publish(topic.to_string(), message))
61        .await?;
62
63    // 读取响应以确认发布成功
64    match connection.read_frame().await? {
65        Frame::Response(_) => Ok(()),
66        Frame::Error(data) => {
67            let error_str = String::from_utf8_lossy(&data);
68            Err(Error::Protocol(ProtocolError::Other(format!(
69                "发布消息时出错: {}",
70                error_str
71            ))))
72        }
73        _ => Err(Error::Protocol(ProtocolError::Other(
74            "发布消息时收到意外响应".to_string(),
75        ))),
76    }
77}
78
79/// 批量向NSQ主题发布消息
80pub async fn mpublish_messages<A: ToSocketAddrs + std::fmt::Display>(
81    nsqd_addr: A,
82    topic: &str,
83    messages: Vec<Vec<u8>>,
84) -> Result<()> {
85    if messages.is_empty() {
86        return Ok(());
87    }
88
89    let connection = create_nsqd_connection(nsqd_addr, None, None).await?;
90
91    info!("向主题 {} 批量发布 {} 条消息", topic, messages.len());
92
93    connection
94        .send_command(Command::Mpublish(topic.to_string(), messages))
95        .await?;
96
97    // 读取响应以确认发布成功
98    match connection.read_frame().await? {
99        Frame::Response(_) => Ok(()),
100        Frame::Error(data) => {
101            let error_str = String::from_utf8_lossy(&data);
102            Err(Error::Protocol(ProtocolError::Other(format!(
103                "批量发布消息时出错: {}",
104                error_str
105            ))))
106        }
107        _ => Err(Error::Protocol(ProtocolError::Other(
108            "批量发布消息时收到意外响应".to_string(),
109        ))),
110    }
111}
112
113/// 从NSQ主题订阅消息
114pub async fn subscribe<A: ToSocketAddrs + std::fmt::Display>(
115    nsqd_addr: A,
116    topic: &str,
117    channel: &str,
118    ready_count: u32,
119) -> Result<Arc<Connection>> {
120    let connection = create_nsqd_connection(nsqd_addr, None, None).await?;
121
122    info!("订阅主题 {}, 频道 {}", topic, channel);
123
124    // 发送订阅命令
125    connection
126        .send_command(Command::Subscribe(topic.to_string(), channel.to_string()))
127        .await?;
128
129    // 读取订阅响应
130    match connection.read_frame().await? {
131        Frame::Response(_) => {
132            info!("成功订阅");
133
134            // 设置初始RDY计数
135            connection.send_command(Command::Ready(ready_count)).await?;
136
137            Ok(connection)
138        }
139        Frame::Error(data) => {
140            let error_str = String::from_utf8_lossy(&data);
141            Err(Error::Protocol(ProtocolError::Other(format!(
142                "订阅时出错: {}",
143                error_str
144            ))))
145        }
146        _ => Err(Error::Protocol(ProtocolError::Other(
147            "订阅时收到意外响应".to_string(),
148        ))),
149    }
150}