headless_talk/
lib.rs

1pub mod channel;
2mod conn;
3mod constants;
4mod database;
5pub mod event;
6pub mod handler;
7pub mod init;
8mod task;
9mod updater;
10pub mod user;
11
12use channel::{
13    load_list_item,
14    normal::{self, NormalChannelOp},
15    open::OpenChannelOp,
16    ChannelListItem, ChannelOp, ClientChannel,
17};
18use conn::Conn;
19use diesel::{BoolExpressionMethods, ExpressionMethods, QueryDsl, RunQueryDsl};
20
21use database::{
22    model::channel::ChannelListRow,
23    schema::{channel_list, user_profile},
24    PoolTaskError,
25};
26use talk_loco_client::{
27    talk::session::{channel::chat_on::ChatOnChannelType, TalkSession},
28    RequestError,
29};
30use task::BackgroundTask;
31use thiserror::Error;
32
33pub use talk_loco_client;
34
35#[derive(Debug)]
36pub struct HeadlessTalk {
37    pub conn: Conn,
38
39    pub _ping_task: BackgroundTask,
40    pub _stream_task: BackgroundTask,
41}
42
43impl HeadlessTalk {
44    pub fn user_id(&self) -> i64 {
45        self.conn.user_id
46    }
47
48    pub async fn channel_list(&self) -> Result<Vec<(i64, ChannelListItem)>, PoolTaskError> {
49        let rows = self
50            .conn
51            .pool
52            .spawn(|conn| {
53                let rows = channel_list::table
54                    .select(channel_list::all_columns)
55                    .load::<ChannelListRow>(conn)?;
56
57                Ok(rows)
58            })
59            .await?;
60
61        let mut list = Vec::with_capacity(rows.capacity());
62
63        for row in rows {
64            if let Some(list_item) = load_list_item(&self.conn.pool, &row).await? {
65                list.push((row.id, list_item))
66            }
67        }
68
69        Ok(list)
70    }
71
72    pub async fn load_channel(&self, id: i64) -> ClientResult<Option<ClientChannel>> {
73        let last_seen_log_id = self
74            .conn
75            .pool
76            .spawn(move |conn| {
77                let last_seen_log_id: Option<i64> = channel_list::table
78                    .filter(channel_list::id.eq(id))
79                    .select(channel_list::last_seen_log_id)
80                    .first::<Option<i64>>(conn)?;
81
82                Ok(last_seen_log_id)
83            })
84            .await?;
85
86        let res = TalkSession(&self.conn.session)
87            .channel(id)
88            .chat_on(last_seen_log_id)
89            .await?;
90
91        if let (Some(active_user_ids), Some(watermarks)) = (res.active_user_ids, res.watermarks) {
92            let active_user_count = active_user_ids.len() as i32;
93            let watermark_iter = active_user_ids.into_iter().zip(watermarks.into_iter());
94
95            self.conn
96                .pool
97                .spawn_transaction(move |conn| {
98                    diesel::update(channel_list::table)
99                        .filter(channel_list::id.eq(id))
100                        .set(channel_list::active_user_count.eq(active_user_count))
101                        .execute(conn)?;
102
103                    for (user_id, watermark) in watermark_iter {
104                        diesel::update(user_profile::table)
105                            .filter(
106                                user_profile::channel_id
107                                    .eq(id)
108                                    .and(user_profile::id.eq(user_id)),
109                            )
110                            .set(user_profile::watermark.eq(watermark))
111                            .execute(conn)?;
112                    }
113
114                    Ok(())
115                })
116                .await?;
117        }
118
119        Ok(match res.channel_type {
120            ChatOnChannelType::DirectChat(normal)
121            | ChatOnChannelType::MultiChat(normal)
122            | ChatOnChannelType::MemoChat(normal) => Some(ClientChannel::Normal(
123                normal::load_channel(id, &self.conn, normal).await?,
124            )),
125
126            _ => None,
127        })
128    }
129
130    pub fn channel(&self, id: i64) -> ChannelOp<'_> {
131        ChannelOp::new(id, &self.conn)
132    }
133
134    pub fn normal_channel(&self, id: i64) -> NormalChannelOp<'_> {
135        NormalChannelOp::new(id, &self.conn)
136    }
137
138    pub fn open_channel(&self, id: i64, link_id: i64) -> OpenChannelOp<'_> {
139        OpenChannelOp::new(id, link_id, &self.conn)
140    }
141
142    pub async fn set_status(&self, client_status: ClientStatus) -> ClientResult<()> {
143        TalkSession(&self.conn.session)
144            .set_status(client_status as _)
145            .await?;
146
147        Ok(())
148    }
149}
150
151#[repr(i32)]
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153pub enum ClientStatus {
154    Unlocked = 1,
155    Locked = 2,
156}
157
158pub type ClientResult<T> = Result<T, ClientError>;
159
160#[derive(Debug, Error)]
161#[error(transparent)]
162pub enum ClientError {
163    Request(#[from] RequestError),
164    Database(#[from] PoolTaskError),
165}