headless_talk/channel/normal/
mod.rs

1pub mod user;
2
3use diesel::{
4    BoolExpressionMethods, ExpressionMethods, JoinOnDsl, OptionalExtension, QueryDsl, RunQueryDsl,
5    SelectableHelper, SqliteConnection,
6};
7use talk_loco_client::talk::{
8    channel::ChannelMetaType,
9    session::{
10        channel::{
11            chat_on::{ChatOnChannelUsers, NormalChatOnChannel},
12            normal,
13        },
14        TalkSession,
15    },
16};
17
18use crate::{
19    conn::Conn,
20    database::{
21        model::{
22            channel::ChannelListRow,
23            user::{
24                normal::{NormalChannelUserModel, NormalChannelUserRow},
25                UserProfileModel, UserProfileRow, UserProfileUpdate,
26            },
27        },
28        schema::{channel_list, channel_meta, normal_channel_user, user_profile},
29        DatabasePool, PoolTaskError,
30    },
31    updater::channel::ChannelUpdater,
32    user::DisplayUser,
33    ClientResult,
34};
35
36use self::user::NormalChannelUser;
37
38use super::{ListChannelProfile, UserList};
39
40#[derive(Debug, Clone)]
41pub struct NormalChannel {
42    pub users: UserList<NormalChannelUser>,
43}
44
45#[derive(Debug, Clone, Copy)]
46pub struct NormalChannelOp<'a> {
47    id: i64,
48    conn: &'a Conn,
49}
50
51impl<'a> NormalChannelOp<'a> {
52    pub(crate) const fn new(id: i64, conn: &'a Conn) -> Self {
53        Self { id, conn }
54    }
55
56    pub const fn id(self) -> i64 {
57        self.id
58    }
59
60    pub async fn read_chat(self, watermark: i64) -> ClientResult<()> {
61        let id = self.id;
62
63        TalkSession(&self.conn.session)
64            .normal_channel(id)
65            .noti_read(watermark)
66            .await?;
67
68        self.conn
69            .pool
70            .spawn(move |conn| {
71                diesel::update(channel_list::table)
72                    .filter(channel_list::id.eq(id))
73                    .set(channel_list::last_seen_log_id.eq(watermark))
74                    .execute(conn)?;
75
76                Ok(())
77            })
78            .await?;
79
80        Ok(())
81    }
82
83    pub async fn leave(self, block: bool) -> ClientResult<()> {
84        let id = self.id;
85
86        TalkSession(&self.conn.session)
87            .normal_channel(id)
88            .leave(block)
89            .await?;
90
91        self.conn
92            .pool
93            .spawn_transaction(move |conn| ChannelUpdater::new(id).remove(conn))
94            .await?;
95
96        Ok(())
97    }
98}
99
100pub(super) async fn load_list_profile(
101    pool: &DatabasePool,
102    display_users: &[DisplayUser],
103    row: &ChannelListRow,
104) -> Result<ListChannelProfile, PoolTaskError> {
105    let id = row.id;
106
107    let (name, image_meta) = pool
108        .spawn(move |conn| {
109            let name: Option<String> = channel_meta::table
110                .filter(
111                    channel_meta::channel_id
112                        .eq(id)
113                        .and(channel_meta::type_.eq(ChannelMetaType::Title as i32)),
114                )
115                .select(channel_meta::content)
116                .first(conn)
117                .optional()?;
118
119            let image_url: Option<String> = channel_meta::table
120                .filter(
121                    channel_meta::channel_id
122                        .eq(id)
123                        .and(channel_meta::type_.eq(ChannelMetaType::Profile as i32)),
124                )
125                .select(channel_meta::content)
126                .first(conn)
127                .optional()?;
128
129            Ok((name, image_url))
130        })
131        .await?;
132
133    let name = name.unwrap_or_else(|| {
134        display_users
135            .iter()
136            .map(|user| user.profile.nickname.as_str())
137            .collect::<Vec<&str>>()
138            .join(", ")
139    });
140
141    Ok(ListChannelProfile {
142        name,
143        image: image_meta.and_then(|meta| serde_json::from_str(&meta).ok()),
144    })
145}
146
147pub(crate) async fn load_channel(
148    id: i64,
149    conn: &Conn,
150    normal: NormalChatOnChannel,
151) -> ClientResult<NormalChannel> {
152    let users = conn
153        .pool
154        .spawn_transaction(move |conn| {
155            let mut user_list: UserList<NormalChannelUser> = UserList::new();
156
157            match normal.users {
158                ChatOnChannelUsers::Ids(ids) => {
159                    for user_id in ids.iter().copied() {
160                        user_list.push((user_id, get_channel_user(conn, id, user_id)?));
161                    }
162                }
163
164                ChatOnChannelUsers::Users(users) => {
165                    update_channel_users(conn, id, &users)?;
166
167                    for user_id in users.iter().map(|user| user.user_id) {
168                        user_list.push((user_id, get_channel_user(conn, id, user_id)?));
169                    }
170                }
171            }
172
173            Ok(user_list)
174        })
175        .await?;
176
177    Ok(NormalChannel { users })
178}
179
180fn get_channel_user(
181    conn: &mut SqliteConnection,
182    id: i64,
183    user_id: i64,
184) -> Result<NormalChannelUser, PoolTaskError> {
185    let (profile, normal) = user_profile::table
186        .filter(
187            user_profile::channel_id
188                .eq(id)
189                .and(user_profile::id.eq(user_id)),
190        )
191        .inner_join(
192            normal_channel_user::table.on(normal_channel_user::channel_id
193                .eq(user_profile::channel_id)
194                .and(normal_channel_user::id.eq(user_profile::id))),
195        )
196        .select((
197            UserProfileModel::as_select(),
198            NormalChannelUserModel::as_select(),
199        ))
200        .first::<(UserProfileModel, NormalChannelUserModel)>(conn)?;
201
202    Ok(NormalChannelUser::from_models(profile, normal))
203}
204
205fn update_channel_users(
206    conn: &mut SqliteConnection,
207    id: i64,
208    users: &[normal::user::User],
209) -> Result<(), PoolTaskError> {
210    for user in users {
211        diesel::insert_into(user_profile::table)
212            .values(UserProfileRow::from_normal_user(id, user))
213            .on_conflict((user_profile::id, user_profile::channel_id))
214            .do_update()
215            .set(UserProfileUpdate::from(user))
216            .execute(conn)?;
217    }
218
219    diesel::replace_into(normal_channel_user::table)
220        .values(
221            users
222                .iter()
223                .map(|user| NormalChannelUserRow::from_user(id, user))
224                .collect::<Vec<_>>(),
225        )
226        .execute(conn)?;
227
228    Ok(())
229}