lamprey_backend/
state.rs

1use std::{
2    ops::Deref,
3    sync::{Arc, Weak},
4};
5
6use common::v1::types::{
7    voice::SfuCommand, AuditLogEntry, ChannelId, Media, Message, RoomId, SfuId, UserId,
8};
9use common::v1::types::{MessageSync, MessageType};
10use dashmap::DashMap;
11
12use sqlx::PgPool;
13use tokio::sync::broadcast::Sender;
14use tracing::error;
15use url::Url;
16
17use crate::{
18    config::Config,
19    data::{postgres::Postgres, Data},
20    services::Services,
21    sync::Connection,
22    Error, Result,
23};
24
25pub struct ServerStateInner {
26    pub config: Config,
27    pub pool: PgPool,
28    pub services: Weak<Services>,
29
30    // this is fine probably
31    pub sushi: Sender<MessageSync>,
32    // channel_user: Arc<DashMap<UserId, (Sender<MessageServer>, Receiver<MessageServer>)>>,
33    pub sushi_sfu: Sender<SfuCommand>,
34
35    // TODO: write a wrapper around this (media is kind of like this?)
36    pub blobs: opendal::Operator,
37
38    pub sfus: DashMap<SfuId, ()>,
39    pub thread_to_sfu: DashMap<ChannelId, SfuId>,
40}
41
42pub struct ServerState {
43    pub inner: Arc<ServerStateInner>,
44    pub services: Arc<Services>,
45
46    // TODO: limit number of connections per user, clean up old/unused entries
47    pub syncers: Arc<DashMap<String, Connection>>,
48}
49
50impl ServerStateInner {
51    pub fn data(&self) -> Box<dyn Data> {
52        Box::new(Postgres {
53            pool: self.pool.clone(),
54        })
55    }
56
57    pub fn services(&self) -> Arc<Services> {
58        self.services
59            .upgrade()
60            .expect("services should always exist while serverstateinner is alive")
61    }
62
63    // fn acquire_data(&self) -> Box<dyn Data> {
64    //     Box::new(Postgres {
65    //         pool: self.pool.clone(),
66    //     })
67    // }
68
69    pub async fn broadcast_room(
70        &self,
71        _room_id: RoomId,
72        _user_id: UserId, // TODO: remove
73        msg: MessageSync,
74    ) -> Result<()> {
75        let _ = self.sushi.send(msg);
76        Ok(())
77    }
78
79    pub async fn broadcast_channel(
80        &self,
81        _thread_id: ChannelId,
82        _user_id: UserId,
83        msg: MessageSync,
84    ) -> Result<()> {
85        let _ = self.sushi.send(msg);
86        Ok(())
87    }
88
89    pub fn broadcast(&self, msg: MessageSync) -> Result<()> {
90        let _ = self.sushi.send(msg);
91        Ok(())
92    }
93
94    pub fn get_s3_url(&self, path: &str) -> Result<Url> {
95        let mut u = Url::parse("s3://")?;
96        u.set_host(Some(&self.config.s3.bucket))?;
97        u.set_path(path);
98        Ok(u)
99    }
100
101    /// presigns every relevant url in a piece of media
102    pub async fn presign(&self, _media: &mut Media) -> Result<()> {
103        // in the past, media was served directly from s3
104        // this doesn't do anything, but i'll keep it just in case
105        Ok(())
106    }
107
108    pub async fn audit_log_append(&self, entry: AuditLogEntry) -> Result<()> {
109        self.data().audit_logs_room_append(entry.clone()).await?;
110        self.broadcast_room(
111            entry.room_id,
112            entry.user_id,
113            MessageSync::AuditLogEntryCreate { entry },
114        )
115        .await?;
116        Ok(())
117    }
118
119    /// presigns every relevant url in a Message
120    pub async fn presign_message(&self, message: &mut Message) -> Result<()> {
121        match &mut message.message_type {
122            MessageType::DefaultMarkdown(message) => {
123                for media in &mut message.attachments {
124                    self.presign(media).await?;
125                }
126                for emb in &mut message.embeds {
127                    if let Some(m) = &mut emb.media {
128                        self.presign(m).await?;
129                    }
130                    if let Some(m) = &mut emb.author_avatar {
131                        self.presign(m).await?;
132                    }
133                    if let Some(m) = &mut emb.site_avatar {
134                        self.presign(m).await?;
135                    }
136                }
137            }
138            _ => {}
139        }
140        Ok(())
141    }
142
143    /// select the "best" sfu and pair it with this thread id. return the existing sfu id if it exists.
144    ///
145    /// currently "best" means the sfu with least load in terms of # of threads using it
146    pub async fn alloc_sfu(&self, thread_id: ChannelId) -> Result<SfuId> {
147        if let Some(existing) = self.thread_to_sfu.get(&thread_id) {
148            return Ok(*existing);
149        }
150
151        let sfu_thread_counts = DashMap::<SfuId, u64>::new();
152        for i in &self.sfus {
153            sfu_thread_counts.insert(*i.key(), 0);
154        }
155        for i in &self.thread_to_sfu {
156            *sfu_thread_counts.get_mut(i.value()).unwrap() += 1;
157        }
158        let mut sorted: Vec<_> = sfu_thread_counts.into_iter().collect();
159        sorted.sort_by_key(|(_, count)| *count);
160        if let Some((chosen, _)) = sorted.first() {
161            self.thread_to_sfu.insert(thread_id, *chosen);
162            let thread = self.services().channels.get(thread_id, None).await?;
163            self.sushi_sfu
164                .send(SfuCommand::Thread {
165                    thread: thread.into(),
166                })
167                .unwrap();
168            Ok(*chosen)
169        } else {
170            error!("no available sfu");
171            Err(Error::BadStatic("no available sfu"))
172        }
173    }
174}
175
176impl ServerState {
177    pub fn new(config: Config, pool: PgPool, blobs: opendal::Operator) -> Self {
178        // a bit hacky for now since i need to work around the existing ServerState
179        // though i probably need some way to access global state/services from within them anyways
180        let services = Arc::new_cyclic(|weak| {
181            let inner = Arc::new(ServerStateInner {
182                config,
183                pool,
184                services: weak.to_owned(),
185                blobs,
186
187                // maybe i should increase the limit at some point? or make it unlimited?
188                sushi: tokio::sync::broadcast::channel(100).0,
189                sushi_sfu: tokio::sync::broadcast::channel(100).0,
190
191                sfus: DashMap::new(),
192                thread_to_sfu: DashMap::new(),
193            });
194            Services::new(inner.clone())
195        });
196        Self {
197            inner: services.state.clone(),
198            syncers: Arc::new(DashMap::new()),
199            // channel_user: Arc::new(DashMap::new()),
200            services,
201        }
202    }
203
204    pub fn config(&self) -> &Config {
205        &self.inner.config
206    }
207
208    pub fn data(&self) -> Box<dyn Data> {
209        self.inner.data()
210    }
211
212    pub fn services(self: &Arc<Self>) -> Arc<Services> {
213        self.services.clone()
214    }
215}
216
217impl Deref for ServerState {
218    type Target = ServerStateInner;
219
220    fn deref(&self) -> &Self::Target {
221        &self.inner
222    }
223}