use async_std::future;
use async_std::io;
use async_std::net::TcpStream;
use async_std::prelude::*;
use async_std::sync::{channel, Arc, Mutex};
use async_std::{stream, task};
use std::collections::HashSet;
pub mod data;
pub mod reader;
pub mod utils;
pub mod writer;
pub use async_trait::async_trait;
pub use data::{
Address, IdentifyConfig, Msg, NsqHttpdInitConfig, NsqLookupdConfig, NsqdClientConfig,
NsqdConfig, NsqdInitConfig,
};
pub use reader::Reader;
pub use utils::read_toml_config;
pub use writer::Writer;
#[async_trait]
pub trait MsgHandler {
async fn handler(&self, msg: Msg) -> Option<Msg> {
Some(msg)
}
}
pub struct NsqLookupd {
pub address: String,
pub cached_address: HashSet<NsqdConfig>,
pub topic: String,
pub nsq_channel: String,
pub identify_config: IdentifyConfig,
}
impl NsqLookupd {
pub fn new(
address: String,
topic: String,
nsq_channel: String,
identify_config: IdentifyConfig,
) -> NsqLookupd {
NsqLookupd {
address,
topic,
cached_address: HashSet::new(),
nsq_channel,
identify_config,
}
}
pub async fn periodically_lookup(
&mut self,
mut interval: stream::Interval,
handler: Arc<Box<dyn MsgHandler + Send + Sync>>,
) -> io::Result<()> {
loop {
match interval.next().await {
Some(m_interval) => {
self.nsqlookup(handler.clone()).await;
println!("prints every four seconds");
}
None => break,
}
}
Ok(())
}
async fn nsqlookup(
&mut self,
handler: Arc<Box<dyn MsgHandler + Send + Sync>>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let url = format!(
"http://{address}/lookup?topic={topic}",
address = self.address,
topic = self.topic
);
dbg!(&url);
let mut res = surf::get(url).await?;
let body = res.body_string().await?;
dbg!(&body);
let nsq_lookupd_config: NsqLookupdConfig = serde_json::from_str(body.as_str())?;
for nsqd_config in nsq_lookupd_config.producers {
if !self.cached_address.contains(&nsqd_config) {
dbg!("find new address,{:?}", &nsqd_config);
let addr = Address::ReaderdAddr(format!(
"{}:{}",
nsqd_config.broadcast_address, nsqd_config.tcp_port,
));
task::spawn(subscribe(
addr,
handler.clone(),
self.topic.to_owned(),
self.nsq_channel.to_owned(),
serde_json::to_string(&self.identify_config)?,
));
self.cached_address.insert(nsqd_config);
} else {
dbg!("address exists,{:?}", &nsqd_config);
}
}
Ok(())
}
}
pub async fn subscribe(
address: Address,
handler: Arc<Box<dyn MsgHandler + Send + Sync>>,
nsq_topic: String,
nsq_channel: String,
identify_config: String,
) -> io::Result<Reader> {
let mut reader = create_reader(address, identify_config).await?;
reader.connect().await?;
loop {
reader.sub(nsq_topic.as_str(), nsq_channel.as_str()).await?;
'a: loop {
dbg!("msg_receiver");
let msg_received = reader.msg_receiver.recv().await;
if let Some(Some(msg)) = msg_received {
if let Some(n_msg) = handler.handler(msg).await {
reader.msg_sender.send(Some(n_msg)).await;
break 'a;
}
} else if let Some(None) = msg_received {
break 'a;
}
}
dbg!("reconnect tcpstream");
reader.reconnect().await?;
}
Ok(reader)
}
pub async fn create_reader(address: Address, identify_config: String) -> io::Result<Reader> {
if let Address::ReaderdAddr(m_addr) = &address {
let stream = TcpStream::connect(m_addr).await?;
let stream = Arc::new(stream);
let max_inflight = 50;
let (msg_sender, msg_receiver) = channel(100);
let msg_sender = Arc::new(msg_sender);
let data_bufer = Arc::new(Mutex::new(Vec::new()));
let reader = Reader {
stream,
max_inflight,
connected: Arc::new(Mutex::new(false)),
msg_sender,
msg_receiver,
data_bufer,
address,
identify_config,
};
return Ok(reader);
}
Err(io::Error::new(io::ErrorKind::Other, "address error"))
}
pub async fn create_writer(address: Address) -> io::Result<Writer> {
if let Address::ReaderdAddr(m_addr) = &address {
let stream = TcpStream::connect(m_addr).await?;
let stream = Arc::new(stream);
let writer = Writer { stream, address };
return Ok(writer);
}
Err(io::Error::new(io::ErrorKind::Other, "address error"))?
}