headless_talk/channel/normal/
mod.rs1pub mod user;
2
3use diesel::{
4 BoolExpressionMethods, ExpressionMethods, JoinOnDsl, OptionalExtension, QueryDsl, RunQueryDsl,
5 SelectableHelper, SqliteConnection,
6};
7use talk_loco_client::talk::{
8 channel::ChannelMetaType,
9 session::{
10 channel::{
11 chat_on::{ChatOnChannelUsers, NormalChatOnChannel},
12 normal,
13 },
14 TalkSession,
15 },
16};
17
18use crate::{
19 conn::Conn,
20 database::{
21 model::{
22 channel::ChannelListRow,
23 user::{
24 normal::{NormalChannelUserModel, NormalChannelUserRow},
25 UserProfileModel, UserProfileRow, UserProfileUpdate,
26 },
27 },
28 schema::{channel_list, channel_meta, normal_channel_user, user_profile},
29 DatabasePool, PoolTaskError,
30 },
31 updater::channel::ChannelUpdater,
32 user::DisplayUser,
33 ClientResult,
34};
35
36use self::user::NormalChannelUser;
37
38use super::{ListChannelProfile, UserList};
39
40#[derive(Debug, Clone)]
41pub struct NormalChannel {
42 pub users: UserList<NormalChannelUser>,
43}
44
45#[derive(Debug, Clone, Copy)]
46pub struct NormalChannelOp<'a> {
47 id: i64,
48 conn: &'a Conn,
49}
50
51impl<'a> NormalChannelOp<'a> {
52 pub(crate) const fn new(id: i64, conn: &'a Conn) -> Self {
53 Self { id, conn }
54 }
55
56 pub const fn id(self) -> i64 {
57 self.id
58 }
59
60 pub async fn read_chat(self, watermark: i64) -> ClientResult<()> {
61 let id = self.id;
62
63 TalkSession(&self.conn.session)
64 .normal_channel(id)
65 .noti_read(watermark)
66 .await?;
67
68 self.conn
69 .pool
70 .spawn(move |conn| {
71 diesel::update(channel_list::table)
72 .filter(channel_list::id.eq(id))
73 .set(channel_list::last_seen_log_id.eq(watermark))
74 .execute(conn)?;
75
76 Ok(())
77 })
78 .await?;
79
80 Ok(())
81 }
82
83 pub async fn leave(self, block: bool) -> ClientResult<()> {
84 let id = self.id;
85
86 TalkSession(&self.conn.session)
87 .normal_channel(id)
88 .leave(block)
89 .await?;
90
91 self.conn
92 .pool
93 .spawn_transaction(move |conn| ChannelUpdater::new(id).remove(conn))
94 .await?;
95
96 Ok(())
97 }
98}
99
100pub(super) async fn load_list_profile(
101 pool: &DatabasePool,
102 display_users: &[DisplayUser],
103 row: &ChannelListRow,
104) -> Result<ListChannelProfile, PoolTaskError> {
105 let id = row.id;
106
107 let (name, image_meta) = pool
108 .spawn(move |conn| {
109 let name: Option<String> = channel_meta::table
110 .filter(
111 channel_meta::channel_id
112 .eq(id)
113 .and(channel_meta::type_.eq(ChannelMetaType::Title as i32)),
114 )
115 .select(channel_meta::content)
116 .first(conn)
117 .optional()?;
118
119 let image_url: Option<String> = channel_meta::table
120 .filter(
121 channel_meta::channel_id
122 .eq(id)
123 .and(channel_meta::type_.eq(ChannelMetaType::Profile as i32)),
124 )
125 .select(channel_meta::content)
126 .first(conn)
127 .optional()?;
128
129 Ok((name, image_url))
130 })
131 .await?;
132
133 let name = name.unwrap_or_else(|| {
134 display_users
135 .iter()
136 .map(|user| user.profile.nickname.as_str())
137 .collect::<Vec<&str>>()
138 .join(", ")
139 });
140
141 Ok(ListChannelProfile {
142 name,
143 image: image_meta.and_then(|meta| serde_json::from_str(&meta).ok()),
144 })
145}
146
147pub(crate) async fn load_channel(
148 id: i64,
149 conn: &Conn,
150 normal: NormalChatOnChannel,
151) -> ClientResult<NormalChannel> {
152 let users = conn
153 .pool
154 .spawn_transaction(move |conn| {
155 let mut user_list: UserList<NormalChannelUser> = UserList::new();
156
157 match normal.users {
158 ChatOnChannelUsers::Ids(ids) => {
159 for user_id in ids.iter().copied() {
160 user_list.push((user_id, get_channel_user(conn, id, user_id)?));
161 }
162 }
163
164 ChatOnChannelUsers::Users(users) => {
165 update_channel_users(conn, id, &users)?;
166
167 for user_id in users.iter().map(|user| user.user_id) {
168 user_list.push((user_id, get_channel_user(conn, id, user_id)?));
169 }
170 }
171 }
172
173 Ok(user_list)
174 })
175 .await?;
176
177 Ok(NormalChannel { users })
178}
179
180fn get_channel_user(
181 conn: &mut SqliteConnection,
182 id: i64,
183 user_id: i64,
184) -> Result<NormalChannelUser, PoolTaskError> {
185 let (profile, normal) = user_profile::table
186 .filter(
187 user_profile::channel_id
188 .eq(id)
189 .and(user_profile::id.eq(user_id)),
190 )
191 .inner_join(
192 normal_channel_user::table.on(normal_channel_user::channel_id
193 .eq(user_profile::channel_id)
194 .and(normal_channel_user::id.eq(user_profile::id))),
195 )
196 .select((
197 UserProfileModel::as_select(),
198 NormalChannelUserModel::as_select(),
199 ))
200 .first::<(UserProfileModel, NormalChannelUserModel)>(conn)?;
201
202 Ok(NormalChannelUser::from_models(profile, normal))
203}
204
205fn update_channel_users(
206 conn: &mut SqliteConnection,
207 id: i64,
208 users: &[normal::user::User],
209) -> Result<(), PoolTaskError> {
210 for user in users {
211 diesel::insert_into(user_profile::table)
212 .values(UserProfileRow::from_normal_user(id, user))
213 .on_conflict((user_profile::id, user_profile::channel_id))
214 .do_update()
215 .set(UserProfileUpdate::from(user))
216 .execute(conn)?;
217 }
218
219 diesel::replace_into(normal_channel_user::table)
220 .values(
221 users
222 .iter()
223 .map(|user| NormalChannelUserRow::from_user(id, user))
224 .collect::<Vec<_>>(),
225 )
226 .execute(conn)?;
227
228 Ok(())
229}