chat_system/servers/
irc.rs1use crate::message::Message;
4use crate::server::{ChatListener, MessageHandler};
5use anyhow::Result;
6use async_trait::async_trait;
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
9use tokio::net::TcpListener;
10
11pub struct IrcListener {
33 address: String,
34 shutdown_tx: Option<tokio::sync::watch::Sender<bool>>,
35}
36
37impl IrcListener {
38 pub fn new(address: impl Into<String>) -> Self {
41 Self {
42 address: address.into(),
43 shutdown_tx: None,
44 }
45 }
46}
47
48pub(super) async fn handle_connection(
54 stream: impl tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
55 handler: MessageHandler,
56) -> Result<()> {
57 let (reader, mut writer) = tokio::io::split(stream);
58 let mut lines = BufReader::new(reader).lines();
59 let mut nick = String::new();
60 let mut user_seen = false;
61 let mut registered = false;
62
63 while let Some(line) = lines.next_line().await? {
64 let line = line.trim().to_string();
65 if line.is_empty() {
66 continue;
67 }
68
69 if let Some(rest) = line.strip_prefix("NICK ") {
70 nick = rest.trim().to_string();
71 } else if line.starts_with("USER ") {
72 user_seen = true;
73 } else if line.starts_with("PING ") {
74 let token = line.trim_start_matches("PING ");
75 writer
76 .write_all(format!("PONG {}\r\n", token).as_bytes())
77 .await?;
78 } else if let Some(rest) = line.strip_prefix("PRIVMSG ") {
79 let parts: Vec<&str> = rest.splitn(2, ' ').collect();
80 if parts.len() == 2 {
81 let target = parts[0];
82 let content = parts[1].trim_start_matches(':');
83 let msg = Message {
84 id: format!("irc-{}", chrono::Utc::now().timestamp_millis()),
85 sender: nick.clone(),
86 content: content.to_string(),
87 timestamp: chrono::Utc::now().timestamp(),
88 channel: Some(target.to_string()),
89 reply_to: None,
90 media: None,
91 is_direct: !target.starts_with('#'),
92 reactions: None,
93 };
94 if let Ok(Some(reply)) = handler(msg).await {
95 let response =
96 format!(":server!server@localhost PRIVMSG {} :{}\r\n", target, reply);
97 writer.write_all(response.as_bytes()).await?;
98 }
99 }
100 } else if line == "QUIT" || line.starts_with("QUIT ") {
101 break;
102 }
103
104 if !registered && !nick.is_empty() && user_seen {
105 writer
106 .write_all(format!(":localhost 001 {} :Welcome\r\n", nick).as_bytes())
107 .await?;
108 registered = true;
109 }
110 }
111 Ok(())
112}
113
114#[async_trait]
115impl ChatListener for IrcListener {
116 fn address(&self) -> &str {
117 &self.address
118 }
119
120 fn protocol(&self) -> &str {
121 "irc"
122 }
123
124 async fn start(
125 &mut self,
126 handler: MessageHandler,
127 alive: tokio::sync::mpsc::Sender<()>,
128 ) -> Result<()> {
129 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
130 let listener = TcpListener::bind(&self.address).await?;
131 self.address = listener.local_addr()?.to_string();
133 tracing::info!(address = %self.address, "IRC listener bound");
134 self.shutdown_tx = Some(shutdown_tx);
135
136 tokio::spawn(async move {
137 let _alive = alive;
140
141 loop {
142 tokio::select! {
143 result = listener.accept() => {
144 match result {
145 Ok((stream, peer)) => {
146 tracing::debug!(%peer, "IRC listener: new connection");
147 let h = Arc::clone(&handler);
148 tokio::spawn(async move {
149 if let Err(e) = handle_connection(stream, h).await {
150 tracing::warn!("IRC connection error: {e}");
151 }
152 });
153 }
154 Err(e) => {
155 tracing::warn!("IRC listener accept error: {e}");
156 break;
157 }
158 }
159 }
160 _ = shutdown_rx.changed() => {
161 if *shutdown_rx.borrow() {
162 break;
163 }
164 }
165 }
166 }
167 });
168
169 Ok(())
170 }
171
172 async fn shutdown(&mut self) -> Result<()> {
173 if let Some(tx) = &self.shutdown_tx {
174 let _ = tx.send(true);
175 }
176 Ok(())
177 }
178}