Skip to main content

lb_rs/service/
activity.rs

1use crate::LocalLb;
2use crate::model::errors::LbResult;
3use crate::model::tree_like::TreeLike;
4use crate::service::events::Actor;
5use serde::{Deserialize, Serialize};
6use std::cmp::{self, Ordering};
7use std::collections::HashMap;
8use uuid::Uuid;
9
10impl LocalLb {
11    #[instrument(level = "debug", skip(self), err(Debug))]
12    pub async fn suggested_docs(&self, settings: RankingWeights) -> LbResult<Vec<Uuid>> {
13        let db = self.ro_tx().await;
14        let db = db.db();
15
16        let mut scores = db.doc_events.get().iter().get_activity_metrics();
17        self.normalize(&mut scores);
18
19        scores.sort_unstable_by_key(|b| cmp::Reverse(b.score(settings)));
20
21        scores.truncate(10);
22
23        let mut result = Vec::new();
24        let mut tree = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
25        for score in scores {
26            if tree.maybe_find(&score.id).is_none() {
27                continue;
28            }
29
30            if tree.calculate_deleted(&score.id)? {
31                continue;
32            }
33            if tree.in_pending_share(&score.id)? {
34                continue;
35            }
36            result.push(score.id);
37        }
38
39        Ok(result)
40    }
41
42    #[instrument(level = "debug", skip(self), err(Debug))]
43    pub async fn clear_suggested(&self) -> LbResult<()> {
44        let mut tx = self.begin_tx().await;
45        let db = tx.db();
46        db.doc_events.clear()?;
47        self.events.meta_changed(Actor::User);
48        Ok(())
49    }
50
51    #[instrument(level = "debug", skip(self), err(Debug))]
52    pub async fn clear_suggested_id(&self, id: Uuid) -> LbResult<()> {
53        let mut tx = self.begin_tx().await;
54        let db = tx.db();
55
56        let mut entries = db.doc_events.get().to_vec();
57        db.doc_events.clear()?;
58        entries.retain(|e| e.id() != id);
59        for entry in entries {
60            db.doc_events.push(entry)?;
61        }
62
63        Ok(())
64    }
65
66    pub(crate) async fn add_doc_event(&self, event: DocEvent) -> LbResult<()> {
67        let mut tx = self.begin_tx().await;
68        let db = tx.db();
69
70        let max_stored_events = 1000;
71        let events = &db.doc_events;
72
73        if events.get().len() > max_stored_events {
74            db.doc_events.remove(0)?;
75        }
76        db.doc_events.push(event)?;
77        Ok(())
78    }
79
80    pub(crate) fn normalize(&self, docs: &mut [DocActivityMetrics]) {
81        let read_count_range = StatisticValueRange {
82            max: docs.iter().map(|f| f.read_count).max().unwrap_or_default(),
83            min: docs.iter().map(|f| f.read_count).min().unwrap_or_default(),
84        };
85
86        let write_count_range = StatisticValueRange {
87            max: docs.iter().map(|f| f.write_count).max().unwrap_or_default(),
88            min: docs.iter().map(|f| f.write_count).min().unwrap_or_default(),
89        };
90
91        let last_read_range = StatisticValueRange {
92            max: docs
93                .iter()
94                .map(|f| f.last_read_timestamp)
95                .max()
96                .unwrap_or_default(),
97            min: docs
98                .iter()
99                .map(|f| f.last_read_timestamp)
100                .min()
101                .unwrap_or_default(),
102        };
103        let last_write_range = StatisticValueRange {
104            max: docs
105                .iter()
106                .map(|f| f.last_write_timestamp)
107                .max()
108                .unwrap_or_default(),
109            min: docs
110                .iter()
111                .map(|f| f.last_write_timestamp)
112                .min()
113                .unwrap_or_default(),
114        };
115
116        docs.iter_mut().for_each(|f| {
117            f.read_count.normalize(read_count_range);
118            f.write_count.normalize(write_count_range);
119            f.last_read_timestamp.normalize(last_read_range);
120            f.last_write_timestamp.normalize(last_write_range);
121        });
122    }
123
124    /// hint to background processing pipelines whether or not a user is around
125    pub fn app_foregrounded(&self) {
126        let bg_lb = self.clone();
127        tokio::spawn(async move {
128            *bg_lb.user_last_seen.write().await = web_time::Instant::now();
129        });
130    }
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize, Ord, PartialEq, PartialOrd, Eq, Hash, Copy)]
134pub enum DocEvent {
135    Read(Uuid, i64),
136    Write(Uuid, i64),
137}
138impl DocEvent {
139    pub fn timestamp(&self) -> i64 {
140        match *self {
141            DocEvent::Read(_, timestamp) => timestamp,
142            DocEvent::Write(_, timestamp) => timestamp,
143        }
144    }
145    pub fn id(&self) -> Uuid {
146        match *self {
147            DocEvent::Read(id, _) => id,
148            DocEvent::Write(id, _) => id,
149        }
150    }
151}
152
153#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
154pub struct RankingWeights {
155    /// the freshness of a doc as determined by the last activity
156    pub temporality: i64,
157    /// the amount of write and read on a doc
158    pub io: i64,
159}
160
161impl Default for RankingWeights {
162    fn default() -> Self {
163        Self { temporality: 60, io: 40 }
164    }
165}
166#[derive(Default, Copy, Clone, PartialEq)]
167pub struct StatisticValue {
168    pub raw: i64,
169    pub normalized: Option<f64>,
170}
171
172impl Ord for StatisticValue {
173    fn cmp(&self, other: &Self) -> Ordering {
174        (self.raw).cmp(&other.raw)
175    }
176}
177
178impl PartialOrd for StatisticValue {
179    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
180        Some(self.cmp(other))
181    }
182}
183
184impl Eq for StatisticValue {}
185
186#[derive(Clone, Copy)]
187pub struct StatisticValueRange {
188    pub max: StatisticValue,
189    pub min: StatisticValue,
190}
191impl StatisticValue {
192    pub fn normalize(&mut self, range: StatisticValueRange) {
193        let mut range_distance = range.max.raw - range.min.raw;
194        if range_distance == 0 {
195            range_distance = 1
196        };
197        let normalized = (self.raw - range.min.raw) as f64 / range_distance as f64;
198        self.normalized = Some(normalized);
199    }
200}
201/// DocActivityMetrics stores key document activity features, which are used to recommend relevant documents to the user.
202/// Here's a walkthrough of the recommendation procedure: collect 1k most recent document events (write/read), use that activity to construct a DocActivtyMetrics struct for each document. Min-max normalizes the activity features, then rank the documents.
203#[derive(Default, Copy, Clone, PartialEq)]
204pub struct DocActivityMetrics {
205    pub id: Uuid,
206    /// the latest epoch timestamp that the user read a document
207    pub last_read_timestamp: StatisticValue,
208    /// the latest epoch timestamp that the user wrote a document
209    pub last_write_timestamp: StatisticValue,
210    /// the total number of times that a user reads a document
211    pub read_count: StatisticValue,
212    /// the total number of times that a user wrote a document
213    pub write_count: StatisticValue,
214}
215
216impl DocActivityMetrics {
217    pub fn score(&self, weights: RankingWeights) -> i64 {
218        let timestamp_weight = weights.temporality;
219        let io_count_weight = weights.io;
220
221        let temporality_score = (self.last_read_timestamp.normalized.unwrap_or_default()
222            + self.last_write_timestamp.normalized.unwrap_or_default())
223            * timestamp_weight as f64;
224
225        let io_score = (self.read_count.normalized.unwrap_or_default()
226            + self.write_count.normalized.unwrap_or_default())
227            * io_count_weight as f64;
228
229        (io_score + temporality_score).ceil() as i64
230    }
231}
232pub trait Stats {
233    fn get_activity_metrics(self) -> Vec<DocActivityMetrics>;
234}
235impl<'a, T> Stats for T
236where
237    T: Iterator<Item = &'a DocEvent>,
238{
239    fn get_activity_metrics(self) -> Vec<DocActivityMetrics> {
240        let mut result = Vec::new();
241
242        let mut set = HashMap::new();
243        for event in self {
244            match set.get_mut(&event.id()) {
245                None => {
246                    set.insert(event.id(), vec![event]);
247                }
248                Some(events) => {
249                    events.push(event);
250                }
251            }
252        }
253
254        for (id, events) in set {
255            let read_events = events.iter().filter(|e| matches!(e, DocEvent::Read(_, _)));
256
257            let last_read = read_events
258                .clone()
259                .max_by(|x, y| x.timestamp().cmp(&y.timestamp()));
260
261            let last_read = match last_read {
262                None => 0,
263                Some(x) => x.timestamp(),
264            };
265
266            let write_events = events.iter().filter(|e| matches!(e, DocEvent::Write(_, _)));
267
268            let last_write = write_events
269                .clone()
270                .max_by(|x, y| x.timestamp().cmp(&y.timestamp()));
271            let last_write = match last_write {
272                None => 0,
273                Some(x) => x.timestamp(),
274            };
275
276            let metrics = DocActivityMetrics {
277                id,
278                last_read_timestamp: StatisticValue { raw: last_read, normalized: None },
279                last_write_timestamp: StatisticValue { raw: last_write, normalized: None },
280                read_count: StatisticValue { raw: read_events.count() as i64, normalized: None },
281                write_count: StatisticValue { raw: write_events.count() as i64, normalized: None },
282            };
283            result.push(metrics);
284        }
285
286        result
287    }
288}