headless_talk/handler/
mod.rs

1pub mod error;
2
3use diesel::{dsl::exists, BoolExpressionMethods, ExpressionMethods, QueryDsl, RunQueryDsl};
4use futures_loco_protocol::loco_protocol::command::BoxedCommand;
5use talk_loco_client::talk::stream::{
6    command::{ChgMeta, DecunRead, Kickout, Left, Msg, SyncDlMsg, SyncJoin},
7    StreamCommand,
8};
9
10use crate::{
11    conn::Conn,
12    database::{
13        model::{
14            channel::meta::ChannelMetaRow,
15            chat::{ChatRow, ChatUpdate},
16        },
17        schema::{self, channel_list, channel_meta, chat},
18    },
19    event::{channel::ChannelEvent, ClientEvent},
20    updater::channel::ChannelUpdater,
21};
22
23use self::error::HandlerError;
24
25type HandlerResult = Result<Option<ClientEvent>, HandlerError>;
26
27#[derive(Debug, Clone)]
28pub(crate) struct SessionHandler {
29    conn: Conn,
30}
31
32impl SessionHandler {
33    pub fn new(conn: Conn) -> Self {
34        Self { conn }
35    }
36
37    pub async fn handle(&self, read: BoxedCommand) -> HandlerResult {
38        match StreamCommand::deserialize_from(read)? {
39            StreamCommand::Kickout(kickout) => self.on_kickout(kickout).await,
40            StreamCommand::SwitchServer => self.on_switch_server().await,
41            StreamCommand::Chat(msg) => self.on_chat(msg).await,
42            StreamCommand::ChatRead(read) => self.on_chat_read(read).await,
43            StreamCommand::ChangeMeta(meta) => self.on_meta_change(meta).await,
44            StreamCommand::SyncChatDeletion(deletion) => self.on_chat_deleted(deletion).await,
45            StreamCommand::SyncChannelJoin(sync_join) => self.on_channel_join(sync_join).await,
46            StreamCommand::Left(left) => self.on_left(left).await,
47
48            _ => Ok(None),
49        }
50    }
51
52    async fn on_kickout(&self, kickout: Kickout) -> HandlerResult {
53        Ok(Some(ClientEvent::Kickout(kickout.reason)))
54    }
55
56    async fn on_switch_server(&self) -> HandlerResult {
57        Ok(Some(ClientEvent::SwitchServer))
58    }
59
60    async fn on_chat(&self, msg: Msg) -> HandlerResult {
61        let exists = self
62            .conn
63            .pool
64            .spawn({
65                let row = ChatRow::from_chatlog(msg.chatlog.clone(), None);
66
67                move |conn| {
68                    let count = diesel::select(exists(
69                        channel_list::table.filter(channel_list::id.eq(row.channel_id)),
70                    ))
71                    .execute(conn)?;
72
73                    diesel::replace_into(chat::table)
74                        .values(row)
75                        .execute(conn)?;
76
77                    Ok(count > 0)
78                }
79            })
80            .await?;
81
82        if !exists && msg.link_id.is_none() {
83            ChannelUpdater::new(msg.chat_id)
84                .initialize(&self.conn.session, &self.conn.pool)
85                .await?;
86        }
87
88        Ok(Some(ClientEvent::Channel {
89            id: msg.chat_id,
90
91            event: ChannelEvent::Chat {
92                link_id: msg.link_id,
93
94                user_nickname: msg.author_nickname,
95                chat: msg.chatlog,
96            },
97        }))
98    }
99
100    async fn on_chat_read(&self, read: DecunRead) -> HandlerResult {
101        self.conn
102            .pool
103            .spawn({
104                let DecunRead {
105                    chat_id: channel_id,
106                    user_id,
107                    watermark,
108                } = read.clone();
109
110                move |conn| {
111                    use schema::user_profile;
112
113                    diesel::update(user_profile::table)
114                        .filter(
115                            user_profile::channel_id
116                                .eq(channel_id)
117                                .and(user_profile::id.eq(user_id)),
118                        )
119                        .set(user_profile::watermark.eq(watermark))
120                        .execute(conn)?;
121                    Ok(())
122                }
123            })
124            .await?;
125
126        Ok(Some(ClientEvent::Channel {
127            id: read.chat_id,
128
129            event: ChannelEvent::ChatRead {
130                user_id: read.user_id,
131                log_id: read.watermark,
132            },
133        }))
134    }
135
136    async fn on_meta_change(&self, value: ChgMeta) -> HandlerResult {
137        self.conn
138            .pool
139            .spawn({
140                let value = value.clone();
141
142                move |conn| {
143                    diesel::replace_into(channel_meta::table)
144                        .values(ChannelMetaRow::from(value))
145                        .execute(conn)?;
146
147                    Ok(())
148                }
149            })
150            .await?;
151
152        Ok(Some(ClientEvent::Channel {
153            id: value.chat_id,
154            event: ChannelEvent::MetaChanged(value.meta),
155        }))
156    }
157
158    async fn on_chat_deleted(&self, value: SyncDlMsg) -> HandlerResult {
159        self.conn
160            .pool
161            .spawn({
162                let chatlog = value.chatlog.clone();
163
164                move |conn| {
165                    diesel::update(chat::table)
166                        .filter(
167                            chat::log_id
168                                .eq(chatlog.log_id)
169                                .and(chat::channel_id.eq(chatlog.channel_id)),
170                        )
171                        .set(ChatUpdate::from(chatlog))
172                        .execute(conn)?;
173
174                    Ok(())
175                }
176            })
177            .await?;
178
179        Ok(Some(ClientEvent::Channel {
180            id: value.chatlog.channel_id,
181            event: ChannelEvent::ChatDeleted(value.chatlog),
182        }))
183    }
184
185    async fn on_channel_join(&self, sync_join: SyncJoin) -> HandlerResult {
186        ChannelUpdater::new(sync_join.chat_id)
187            .initialize(&self.conn.session, &self.conn.pool)
188            .await?;
189
190        Ok(Some(ClientEvent::Channel {
191            id: sync_join.chat_id,
192            event: ChannelEvent::Added {
193                chatlog: sync_join.chatlog,
194            },
195        }))
196    }
197
198    async fn on_left(&self, left: Left) -> HandlerResult {
199        let channel_id = left.chat_id;
200
201        self.conn
202            .pool
203            .spawn_transaction(move |conn| ChannelUpdater::new(channel_id).remove(conn))
204            .await?;
205
206        Ok(Some(ClientEvent::Channel {
207            id: channel_id,
208            event: ChannelEvent::Left,
209        }))
210    }
211}