lb_rs/service/
activity.rs

1use crate::model::errors::LbResult;
2use crate::model::tree_like::TreeLike;
3use crate::Lb;
4use serde::Deserialize;
5use serde::Serialize;
6use std::cmp;
7use std::cmp::Ordering;
8use std::collections::HashMap;
9use uuid::Uuid;
10
11impl Lb {
12    #[instrument(level = "debug", skip(self), err(Debug))]
13    pub async fn suggested_docs(&self, settings: RankingWeights) -> LbResult<Vec<Uuid>> {
14        let db = self.ro_tx().await;
15        let db = db.db();
16
17        let mut scores = db.doc_events.get().iter().get_activity_metrics();
18        self.normalize(&mut scores);
19
20        scores.sort_unstable_by_key(|b| cmp::Reverse(b.score(settings)));
21
22        scores.truncate(10);
23
24        let mut result = Vec::new();
25        let mut tree = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
26        for score in scores {
27            if tree.maybe_find(&score.id).is_none() {
28                continue;
29            }
30
31            if tree.calculate_deleted(&score.id)? {
32                continue;
33            }
34            if tree.in_pending_share(&score.id)? {
35                continue;
36            }
37            result.push(score.id);
38        }
39
40        Ok(result)
41    }
42
43    #[instrument(level = "debug", skip(self), err(Debug))]
44    pub async fn clear_suggested(&self) -> LbResult<()> {
45        let mut tx = self.begin_tx().await;
46        let db = tx.db();
47        db.doc_events.clear()?;
48        Ok(())
49    }
50
51    #[instrument(level = "debug", skip(self), err(Debug))]
52    pub async fn clear_suggested_id(&self, id: Uuid) -> LbResult<()> {
53        let mut tx = self.begin_tx().await;
54        let db = tx.db();
55
56        let mut entries = db.doc_events.get().to_vec();
57        db.doc_events.clear()?;
58        entries.retain(|e| e.id() != id);
59        for entry in entries {
60            db.doc_events.push(entry)?;
61        }
62
63        Ok(())
64    }
65
66    pub(crate) async fn add_doc_event(&self, event: DocEvent) -> LbResult<()> {
67        let mut tx = self.begin_tx().await;
68        let db = tx.db();
69
70        let max_stored_events = 1000;
71        let events = &db.doc_events;
72
73        if events.get().len() > max_stored_events {
74            db.doc_events.remove(0)?;
75        }
76        db.doc_events.push(event)?;
77        Ok(())
78    }
79
80    pub(crate) fn normalize(&self, docs: &mut [DocActivityMetrics]) {
81        let read_count_range = StatisticValueRange {
82            max: docs.iter().map(|f| f.read_count).max().unwrap_or_default(),
83            min: docs.iter().map(|f| f.read_count).min().unwrap_or_default(),
84        };
85
86        let write_count_range = StatisticValueRange {
87            max: docs.iter().map(|f| f.write_count).max().unwrap_or_default(),
88            min: docs.iter().map(|f| f.write_count).min().unwrap_or_default(),
89        };
90
91        let last_read_range = StatisticValueRange {
92            max: docs
93                .iter()
94                .map(|f| f.last_read_timestamp)
95                .max()
96                .unwrap_or_default(),
97            min: docs
98                .iter()
99                .map(|f| f.last_read_timestamp)
100                .min()
101                .unwrap_or_default(),
102        };
103        let last_write_range = StatisticValueRange {
104            max: docs
105                .iter()
106                .map(|f| f.last_write_timestamp)
107                .max()
108                .unwrap_or_default(),
109            min: docs
110                .iter()
111                .map(|f| f.last_write_timestamp)
112                .min()
113                .unwrap_or_default(),
114        };
115
116        docs.iter_mut().for_each(|f| {
117            f.read_count.normalize(read_count_range);
118            f.write_count.normalize(write_count_range);
119            f.last_read_timestamp.normalize(last_read_range);
120            f.last_write_timestamp.normalize(last_write_range);
121        });
122    }
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize, Ord, PartialEq, PartialOrd, Eq, Hash)]
126pub enum DocEvent {
127    Read(Uuid, i64),
128    Write(Uuid, i64),
129}
130impl DocEvent {
131    pub fn timestamp(&self) -> i64 {
132        match *self {
133            DocEvent::Read(_, timestamp) => timestamp,
134            DocEvent::Write(_, timestamp) => timestamp,
135        }
136    }
137    pub fn id(&self) -> Uuid {
138        match *self {
139            DocEvent::Read(id, _) => id,
140            DocEvent::Write(id, _) => id,
141        }
142    }
143}
144
145#[derive(Debug, Copy, Clone)]
146pub struct RankingWeights {
147    /// the freshness of a doc as determined by the last activity
148    pub temporality: i64,
149    /// the amount of write and read on a doc
150    pub io: i64,
151}
152
153impl Default for RankingWeights {
154    fn default() -> Self {
155        Self { temporality: 60, io: 40 }
156    }
157}
158#[derive(Default, Copy, Clone, PartialEq)]
159pub struct StatisticValue {
160    pub raw: i64,
161    pub normalized: Option<f64>,
162}
163
164impl Ord for StatisticValue {
165    fn cmp(&self, other: &Self) -> Ordering {
166        (self.raw).cmp(&other.raw)
167    }
168}
169
170impl PartialOrd for StatisticValue {
171    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
172        Some(self.cmp(other))
173    }
174}
175
176impl Eq for StatisticValue {}
177
178#[derive(Clone, Copy)]
179pub struct StatisticValueRange {
180    pub max: StatisticValue,
181    pub min: StatisticValue,
182}
183impl StatisticValue {
184    pub fn normalize(&mut self, range: StatisticValueRange) {
185        let mut range_distance = range.max.raw - range.min.raw;
186        if range_distance == 0 {
187            range_distance = 1
188        };
189        let normalized = (self.raw - range.min.raw) as f64 / range_distance as f64;
190        self.normalized = Some(normalized);
191    }
192}
193/// DocActivityMetrics stores key document activity features, which are used to recommend relevant documents to the user.
194/// Here's a walkthrough of the recommendation procedure: collect 1k most recent document events (write/read), use that activity to construct a DocActivtyMetrics struct for each document. Min-max normalizes the activity features, then rank the documents.
195#[derive(Default, Copy, Clone, PartialEq)]
196pub struct DocActivityMetrics {
197    pub id: Uuid,
198    /// the latest epoch timestamp that the user read a document
199    pub last_read_timestamp: StatisticValue,
200    /// the latest epoch timestamp that the user wrote a document
201    pub last_write_timestamp: StatisticValue,
202    /// the total number of times that a user reads a document
203    pub read_count: StatisticValue,
204    /// the total number of times that a user wrote a document
205    pub write_count: StatisticValue,
206}
207
208impl DocActivityMetrics {
209    pub fn score(&self, weights: RankingWeights) -> i64 {
210        let timestamp_weight = weights.temporality;
211        let io_count_weight = weights.io;
212
213        let temporality_score = (self.last_read_timestamp.normalized.unwrap_or_default()
214            + self.last_write_timestamp.normalized.unwrap_or_default())
215            * timestamp_weight as f64;
216
217        let io_score = (self.read_count.normalized.unwrap_or_default()
218            + self.write_count.normalized.unwrap_or_default())
219            * io_count_weight as f64;
220
221        (io_score + temporality_score).ceil() as i64
222    }
223}
224pub trait Stats {
225    fn get_activity_metrics(self) -> Vec<DocActivityMetrics>;
226}
227impl<'a, T> Stats for T
228where
229    T: Iterator<Item = &'a DocEvent>,
230{
231    fn get_activity_metrics(self) -> Vec<DocActivityMetrics> {
232        let mut result = Vec::new();
233
234        let mut set = HashMap::new();
235        for event in self {
236            match set.get_mut(&event.id()) {
237                None => {
238                    set.insert(event.id(), vec![event]);
239                }
240                Some(events) => {
241                    events.push(event);
242                }
243            }
244        }
245
246        for (id, events) in set {
247            let read_events = events.iter().filter(|e| matches!(e, DocEvent::Read(_, _)));
248
249            let last_read = read_events
250                .clone()
251                .max_by(|x, y| x.timestamp().cmp(&y.timestamp()));
252
253            let last_read = match last_read {
254                None => 0,
255                Some(x) => x.timestamp(),
256            };
257
258            let write_events = events.iter().filter(|e| matches!(e, DocEvent::Write(_, _)));
259
260            let last_write = write_events
261                .clone()
262                .max_by(|x, y| x.timestamp().cmp(&y.timestamp()));
263            let last_write = match last_write {
264                None => 0,
265                Some(x) => x.timestamp(),
266            };
267
268            let metrics = DocActivityMetrics {
269                id,
270                last_read_timestamp: StatisticValue { raw: last_read, normalized: None },
271                last_write_timestamp: StatisticValue { raw: last_write, normalized: None },
272                read_count: StatisticValue { raw: read_events.count() as i64, normalized: None },
273                write_count: StatisticValue { raw: write_events.count() as i64, normalized: None },
274            };
275            result.push(metrics);
276        }
277
278        result
279    }
280}