Skip to main content

lb_rs/service/
activity.rs

1use crate::Lb;
2use crate::model::errors::LbResult;
3use crate::model::tree_like::TreeLike;
4use serde::{Deserialize, Serialize};
5use std::cmp::{self, Ordering};
6use std::collections::HashMap;
7use uuid::Uuid;
8
9impl Lb {
10    #[instrument(level = "debug", skip(self), err(Debug))]
11    pub async fn suggested_docs(&self, settings: RankingWeights) -> LbResult<Vec<Uuid>> {
12        let db = self.ro_tx().await;
13        let db = db.db();
14
15        let mut scores = db.doc_events.get().iter().get_activity_metrics();
16        self.normalize(&mut scores);
17
18        scores.sort_unstable_by_key(|b| cmp::Reverse(b.score(settings)));
19
20        scores.truncate(10);
21
22        let mut result = Vec::new();
23        let mut tree = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
24        for score in scores {
25            if tree.maybe_find(&score.id).is_none() {
26                continue;
27            }
28
29            if tree.calculate_deleted(&score.id)? {
30                continue;
31            }
32            if tree.in_pending_share(&score.id)? {
33                continue;
34            }
35            result.push(score.id);
36        }
37
38        Ok(result)
39    }
40
41    #[instrument(level = "debug", skip(self), err(Debug))]
42    pub async fn clear_suggested(&self) -> LbResult<()> {
43        let mut tx = self.begin_tx().await;
44        let db = tx.db();
45        db.doc_events.clear()?;
46        Ok(())
47    }
48
49    #[instrument(level = "debug", skip(self), err(Debug))]
50    pub async fn clear_suggested_id(&self, id: Uuid) -> LbResult<()> {
51        let mut tx = self.begin_tx().await;
52        let db = tx.db();
53
54        let mut entries = db.doc_events.get().to_vec();
55        db.doc_events.clear()?;
56        entries.retain(|e| e.id() != id);
57        for entry in entries {
58            db.doc_events.push(entry)?;
59        }
60
61        Ok(())
62    }
63
64    pub(crate) async fn add_doc_event(&self, event: DocEvent) -> LbResult<()> {
65        let mut tx = self.begin_tx().await;
66        let db = tx.db();
67
68        let max_stored_events = 1000;
69        let events = &db.doc_events;
70
71        if events.get().len() > max_stored_events {
72            db.doc_events.remove(0)?;
73        }
74        db.doc_events.push(event)?;
75        Ok(())
76    }
77
78    pub(crate) fn normalize(&self, docs: &mut [DocActivityMetrics]) {
79        let read_count_range = StatisticValueRange {
80            max: docs.iter().map(|f| f.read_count).max().unwrap_or_default(),
81            min: docs.iter().map(|f| f.read_count).min().unwrap_or_default(),
82        };
83
84        let write_count_range = StatisticValueRange {
85            max: docs.iter().map(|f| f.write_count).max().unwrap_or_default(),
86            min: docs.iter().map(|f| f.write_count).min().unwrap_or_default(),
87        };
88
89        let last_read_range = StatisticValueRange {
90            max: docs
91                .iter()
92                .map(|f| f.last_read_timestamp)
93                .max()
94                .unwrap_or_default(),
95            min: docs
96                .iter()
97                .map(|f| f.last_read_timestamp)
98                .min()
99                .unwrap_or_default(),
100        };
101        let last_write_range = StatisticValueRange {
102            max: docs
103                .iter()
104                .map(|f| f.last_write_timestamp)
105                .max()
106                .unwrap_or_default(),
107            min: docs
108                .iter()
109                .map(|f| f.last_write_timestamp)
110                .min()
111                .unwrap_or_default(),
112        };
113
114        docs.iter_mut().for_each(|f| {
115            f.read_count.normalize(read_count_range);
116            f.write_count.normalize(write_count_range);
117            f.last_read_timestamp.normalize(last_read_range);
118            f.last_write_timestamp.normalize(last_write_range);
119        });
120    }
121
122    /// hint to background processing pipelines whether or not a user is around
123    pub fn app_foregrounded(&self) {
124        let bg_lb = self.clone();
125        tokio::spawn(async move {
126            *bg_lb.user_last_seen.write().await = web_time::Instant::now();
127        });
128    }
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize, Ord, PartialEq, PartialOrd, Eq, Hash, Copy)]
132pub enum DocEvent {
133    Read(Uuid, i64),
134    Write(Uuid, i64),
135}
136impl DocEvent {
137    pub fn timestamp(&self) -> i64 {
138        match *self {
139            DocEvent::Read(_, timestamp) => timestamp,
140            DocEvent::Write(_, timestamp) => timestamp,
141        }
142    }
143    pub fn id(&self) -> Uuid {
144        match *self {
145            DocEvent::Read(id, _) => id,
146            DocEvent::Write(id, _) => id,
147        }
148    }
149}
150
151#[derive(Debug, Copy, Clone)]
152pub struct RankingWeights {
153    /// the freshness of a doc as determined by the last activity
154    pub temporality: i64,
155    /// the amount of write and read on a doc
156    pub io: i64,
157}
158
159impl Default for RankingWeights {
160    fn default() -> Self {
161        Self { temporality: 60, io: 40 }
162    }
163}
164#[derive(Default, Copy, Clone, PartialEq)]
165pub struct StatisticValue {
166    pub raw: i64,
167    pub normalized: Option<f64>,
168}
169
170impl Ord for StatisticValue {
171    fn cmp(&self, other: &Self) -> Ordering {
172        (self.raw).cmp(&other.raw)
173    }
174}
175
176impl PartialOrd for StatisticValue {
177    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
178        Some(self.cmp(other))
179    }
180}
181
182impl Eq for StatisticValue {}
183
184#[derive(Clone, Copy)]
185pub struct StatisticValueRange {
186    pub max: StatisticValue,
187    pub min: StatisticValue,
188}
189impl StatisticValue {
190    pub fn normalize(&mut self, range: StatisticValueRange) {
191        let mut range_distance = range.max.raw - range.min.raw;
192        if range_distance == 0 {
193            range_distance = 1
194        };
195        let normalized = (self.raw - range.min.raw) as f64 / range_distance as f64;
196        self.normalized = Some(normalized);
197    }
198}
199/// DocActivityMetrics stores key document activity features, which are used to recommend relevant documents to the user.
200/// Here's a walkthrough of the recommendation procedure: collect 1k most recent document events (write/read), use that activity to construct a DocActivtyMetrics struct for each document. Min-max normalizes the activity features, then rank the documents.
201#[derive(Default, Copy, Clone, PartialEq)]
202pub struct DocActivityMetrics {
203    pub id: Uuid,
204    /// the latest epoch timestamp that the user read a document
205    pub last_read_timestamp: StatisticValue,
206    /// the latest epoch timestamp that the user wrote a document
207    pub last_write_timestamp: StatisticValue,
208    /// the total number of times that a user reads a document
209    pub read_count: StatisticValue,
210    /// the total number of times that a user wrote a document
211    pub write_count: StatisticValue,
212}
213
214impl DocActivityMetrics {
215    pub fn score(&self, weights: RankingWeights) -> i64 {
216        let timestamp_weight = weights.temporality;
217        let io_count_weight = weights.io;
218
219        let temporality_score = (self.last_read_timestamp.normalized.unwrap_or_default()
220            + self.last_write_timestamp.normalized.unwrap_or_default())
221            * timestamp_weight as f64;
222
223        let io_score = (self.read_count.normalized.unwrap_or_default()
224            + self.write_count.normalized.unwrap_or_default())
225            * io_count_weight as f64;
226
227        (io_score + temporality_score).ceil() as i64
228    }
229}
230pub trait Stats {
231    fn get_activity_metrics(self) -> Vec<DocActivityMetrics>;
232}
233impl<'a, T> Stats for T
234where
235    T: Iterator<Item = &'a DocEvent>,
236{
237    fn get_activity_metrics(self) -> Vec<DocActivityMetrics> {
238        let mut result = Vec::new();
239
240        let mut set = HashMap::new();
241        for event in self {
242            match set.get_mut(&event.id()) {
243                None => {
244                    set.insert(event.id(), vec![event]);
245                }
246                Some(events) => {
247                    events.push(event);
248                }
249            }
250        }
251
252        for (id, events) in set {
253            let read_events = events.iter().filter(|e| matches!(e, DocEvent::Read(_, _)));
254
255            let last_read = read_events
256                .clone()
257                .max_by(|x, y| x.timestamp().cmp(&y.timestamp()));
258
259            let last_read = match last_read {
260                None => 0,
261                Some(x) => x.timestamp(),
262            };
263
264            let write_events = events.iter().filter(|e| matches!(e, DocEvent::Write(_, _)));
265
266            let last_write = write_events
267                .clone()
268                .max_by(|x, y| x.timestamp().cmp(&y.timestamp()));
269            let last_write = match last_write {
270                None => 0,
271                Some(x) => x.timestamp(),
272            };
273
274            let metrics = DocActivityMetrics {
275                id,
276                last_read_timestamp: StatisticValue { raw: last_read, normalized: None },
277                last_write_timestamp: StatisticValue { raw: last_write, normalized: None },
278                read_count: StatisticValue { raw: read_events.count() as i64, normalized: None },
279                write_count: StatisticValue { raw: write_events.count() as i64, normalized: None },
280            };
281            result.push(metrics);
282        }
283
284        result
285    }
286}