headless_talk/handler/
mod.rs1pub mod error;
2
3use diesel::{dsl::exists, BoolExpressionMethods, ExpressionMethods, QueryDsl, RunQueryDsl};
4use futures_loco_protocol::loco_protocol::command::BoxedCommand;
5use talk_loco_client::talk::stream::{
6 command::{ChgMeta, DecunRead, Kickout, Left, Msg, SyncDlMsg, SyncJoin},
7 StreamCommand,
8};
9
10use crate::{
11 conn::Conn,
12 database::{
13 model::{
14 channel::meta::ChannelMetaRow,
15 chat::{ChatRow, ChatUpdate},
16 },
17 schema::{self, channel_list, channel_meta, chat},
18 },
19 event::{channel::ChannelEvent, ClientEvent},
20 updater::channel::ChannelUpdater,
21};
22
23use self::error::HandlerError;
24
25type HandlerResult = Result<Option<ClientEvent>, HandlerError>;
26
27#[derive(Debug, Clone)]
28pub(crate) struct SessionHandler {
29 conn: Conn,
30}
31
32impl SessionHandler {
33 pub fn new(conn: Conn) -> Self {
34 Self { conn }
35 }
36
37 pub async fn handle(&self, read: BoxedCommand) -> HandlerResult {
38 match StreamCommand::deserialize_from(read)? {
39 StreamCommand::Kickout(kickout) => self.on_kickout(kickout).await,
40 StreamCommand::SwitchServer => self.on_switch_server().await,
41 StreamCommand::Chat(msg) => self.on_chat(msg).await,
42 StreamCommand::ChatRead(read) => self.on_chat_read(read).await,
43 StreamCommand::ChangeMeta(meta) => self.on_meta_change(meta).await,
44 StreamCommand::SyncChatDeletion(deletion) => self.on_chat_deleted(deletion).await,
45 StreamCommand::SyncChannelJoin(sync_join) => self.on_channel_join(sync_join).await,
46 StreamCommand::Left(left) => self.on_left(left).await,
47
48 _ => Ok(None),
49 }
50 }
51
52 async fn on_kickout(&self, kickout: Kickout) -> HandlerResult {
53 Ok(Some(ClientEvent::Kickout(kickout.reason)))
54 }
55
56 async fn on_switch_server(&self) -> HandlerResult {
57 Ok(Some(ClientEvent::SwitchServer))
58 }
59
60 async fn on_chat(&self, msg: Msg) -> HandlerResult {
61 let exists = self
62 .conn
63 .pool
64 .spawn({
65 let row = ChatRow::from_chatlog(msg.chatlog.clone(), None);
66
67 move |conn| {
68 let count = diesel::select(exists(
69 channel_list::table.filter(channel_list::id.eq(row.channel_id)),
70 ))
71 .execute(conn)?;
72
73 diesel::replace_into(chat::table)
74 .values(row)
75 .execute(conn)?;
76
77 Ok(count > 0)
78 }
79 })
80 .await?;
81
82 if !exists && msg.link_id.is_none() {
83 ChannelUpdater::new(msg.chat_id)
84 .initialize(&self.conn.session, &self.conn.pool)
85 .await?;
86 }
87
88 Ok(Some(ClientEvent::Channel {
89 id: msg.chat_id,
90
91 event: ChannelEvent::Chat {
92 link_id: msg.link_id,
93
94 user_nickname: msg.author_nickname,
95 chat: msg.chatlog,
96 },
97 }))
98 }
99
100 async fn on_chat_read(&self, read: DecunRead) -> HandlerResult {
101 self.conn
102 .pool
103 .spawn({
104 let DecunRead {
105 chat_id: channel_id,
106 user_id,
107 watermark,
108 } = read.clone();
109
110 move |conn| {
111 use schema::user_profile;
112
113 diesel::update(user_profile::table)
114 .filter(
115 user_profile::channel_id
116 .eq(channel_id)
117 .and(user_profile::id.eq(user_id)),
118 )
119 .set(user_profile::watermark.eq(watermark))
120 .execute(conn)?;
121 Ok(())
122 }
123 })
124 .await?;
125
126 Ok(Some(ClientEvent::Channel {
127 id: read.chat_id,
128
129 event: ChannelEvent::ChatRead {
130 user_id: read.user_id,
131 log_id: read.watermark,
132 },
133 }))
134 }
135
136 async fn on_meta_change(&self, value: ChgMeta) -> HandlerResult {
137 self.conn
138 .pool
139 .spawn({
140 let value = value.clone();
141
142 move |conn| {
143 diesel::replace_into(channel_meta::table)
144 .values(ChannelMetaRow::from(value))
145 .execute(conn)?;
146
147 Ok(())
148 }
149 })
150 .await?;
151
152 Ok(Some(ClientEvent::Channel {
153 id: value.chat_id,
154 event: ChannelEvent::MetaChanged(value.meta),
155 }))
156 }
157
158 async fn on_chat_deleted(&self, value: SyncDlMsg) -> HandlerResult {
159 self.conn
160 .pool
161 .spawn({
162 let chatlog = value.chatlog.clone();
163
164 move |conn| {
165 diesel::update(chat::table)
166 .filter(
167 chat::log_id
168 .eq(chatlog.log_id)
169 .and(chat::channel_id.eq(chatlog.channel_id)),
170 )
171 .set(ChatUpdate::from(chatlog))
172 .execute(conn)?;
173
174 Ok(())
175 }
176 })
177 .await?;
178
179 Ok(Some(ClientEvent::Channel {
180 id: value.chatlog.channel_id,
181 event: ChannelEvent::ChatDeleted(value.chatlog),
182 }))
183 }
184
185 async fn on_channel_join(&self, sync_join: SyncJoin) -> HandlerResult {
186 ChannelUpdater::new(sync_join.chat_id)
187 .initialize(&self.conn.session, &self.conn.pool)
188 .await?;
189
190 Ok(Some(ClientEvent::Channel {
191 id: sync_join.chat_id,
192 event: ChannelEvent::Added {
193 chatlog: sync_join.chatlog,
194 },
195 }))
196 }
197
198 async fn on_left(&self, left: Left) -> HandlerResult {
199 let channel_id = left.chat_id;
200
201 self.conn
202 .pool
203 .spawn_transaction(move |conn| ChannelUpdater::new(channel_id).remove(conn))
204 .await?;
205
206 Ok(Some(ClientEvent::Channel {
207 id: channel_id,
208 event: ChannelEvent::Left,
209 }))
210 }
211}