lb_rs/service/
activity.rs

1use crate::Lb;
2use crate::model::errors::LbResult;
3use crate::model::tree_like::TreeLike;
4use serde::{Deserialize, Serialize};
5use std::cmp;
6use std::cmp::Ordering;
7use std::collections::HashMap;
8use uuid::Uuid;
9
10impl Lb {
11    #[instrument(level = "debug", skip(self), err(Debug))]
12    pub async fn suggested_docs(&self, settings: RankingWeights) -> LbResult<Vec<Uuid>> {
13        let db = self.ro_tx().await;
14        let db = db.db();
15
16        let mut scores = db.doc_events.get().iter().get_activity_metrics();
17        self.normalize(&mut scores);
18
19        scores.sort_unstable_by_key(|b| cmp::Reverse(b.score(settings)));
20
21        scores.truncate(10);
22
23        let mut result = Vec::new();
24        let mut tree = (&db.base_metadata).to_staged(&db.local_metadata).to_lazy();
25        for score in scores {
26            if tree.maybe_find(&score.id).is_none() {
27                continue;
28            }
29
30            if tree.calculate_deleted(&score.id)? {
31                continue;
32            }
33            if tree.in_pending_share(&score.id)? {
34                continue;
35            }
36            result.push(score.id);
37        }
38
39        Ok(result)
40    }
41
42    #[instrument(level = "debug", skip(self), err(Debug))]
43    pub async fn clear_suggested(&self) -> LbResult<()> {
44        let mut tx = self.begin_tx().await;
45        let db = tx.db();
46        db.doc_events.clear()?;
47        Ok(())
48    }
49
50    #[instrument(level = "debug", skip(self), err(Debug))]
51    pub async fn clear_suggested_id(&self, id: Uuid) -> LbResult<()> {
52        let mut tx = self.begin_tx().await;
53        let db = tx.db();
54
55        let mut entries = db.doc_events.get().to_vec();
56        db.doc_events.clear()?;
57        entries.retain(|e| e.id() != id);
58        for entry in entries {
59            db.doc_events.push(entry)?;
60        }
61
62        Ok(())
63    }
64
65    pub(crate) async fn add_doc_event(&self, event: DocEvent) -> LbResult<()> {
66        let mut tx = self.begin_tx().await;
67        let db = tx.db();
68
69        let max_stored_events = 1000;
70        let events = &db.doc_events;
71
72        if events.get().len() > max_stored_events {
73            db.doc_events.remove(0)?;
74        }
75        db.doc_events.push(event)?;
76        Ok(())
77    }
78
79    pub(crate) fn normalize(&self, docs: &mut [DocActivityMetrics]) {
80        let read_count_range = StatisticValueRange {
81            max: docs.iter().map(|f| f.read_count).max().unwrap_or_default(),
82            min: docs.iter().map(|f| f.read_count).min().unwrap_or_default(),
83        };
84
85        let write_count_range = StatisticValueRange {
86            max: docs.iter().map(|f| f.write_count).max().unwrap_or_default(),
87            min: docs.iter().map(|f| f.write_count).min().unwrap_or_default(),
88        };
89
90        let last_read_range = StatisticValueRange {
91            max: docs
92                .iter()
93                .map(|f| f.last_read_timestamp)
94                .max()
95                .unwrap_or_default(),
96            min: docs
97                .iter()
98                .map(|f| f.last_read_timestamp)
99                .min()
100                .unwrap_or_default(),
101        };
102        let last_write_range = StatisticValueRange {
103            max: docs
104                .iter()
105                .map(|f| f.last_write_timestamp)
106                .max()
107                .unwrap_or_default(),
108            min: docs
109                .iter()
110                .map(|f| f.last_write_timestamp)
111                .min()
112                .unwrap_or_default(),
113        };
114
115        docs.iter_mut().for_each(|f| {
116            f.read_count.normalize(read_count_range);
117            f.write_count.normalize(write_count_range);
118            f.last_read_timestamp.normalize(last_read_range);
119            f.last_write_timestamp.normalize(last_write_range);
120        });
121    }
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize, Ord, PartialEq, PartialOrd, Eq, Hash, Copy)]
125pub enum DocEvent {
126    Read(Uuid, i64),
127    Write(Uuid, i64),
128}
129impl DocEvent {
130    pub fn timestamp(&self) -> i64 {
131        match *self {
132            DocEvent::Read(_, timestamp) => timestamp,
133            DocEvent::Write(_, timestamp) => timestamp,
134        }
135    }
136    pub fn id(&self) -> Uuid {
137        match *self {
138            DocEvent::Read(id, _) => id,
139            DocEvent::Write(id, _) => id,
140        }
141    }
142}
143
144#[derive(Debug, Copy, Clone)]
145pub struct RankingWeights {
146    /// the freshness of a doc as determined by the last activity
147    pub temporality: i64,
148    /// the amount of write and read on a doc
149    pub io: i64,
150}
151
152impl Default for RankingWeights {
153    fn default() -> Self {
154        Self { temporality: 60, io: 40 }
155    }
156}
157#[derive(Default, Copy, Clone, PartialEq)]
158pub struct StatisticValue {
159    pub raw: i64,
160    pub normalized: Option<f64>,
161}
162
163impl Ord for StatisticValue {
164    fn cmp(&self, other: &Self) -> Ordering {
165        (self.raw).cmp(&other.raw)
166    }
167}
168
169impl PartialOrd for StatisticValue {
170    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
171        Some(self.cmp(other))
172    }
173}
174
175impl Eq for StatisticValue {}
176
177#[derive(Clone, Copy)]
178pub struct StatisticValueRange {
179    pub max: StatisticValue,
180    pub min: StatisticValue,
181}
182impl StatisticValue {
183    pub fn normalize(&mut self, range: StatisticValueRange) {
184        let mut range_distance = range.max.raw - range.min.raw;
185        if range_distance == 0 {
186            range_distance = 1
187        };
188        let normalized = (self.raw - range.min.raw) as f64 / range_distance as f64;
189        self.normalized = Some(normalized);
190    }
191}
192/// DocActivityMetrics stores key document activity features, which are used to recommend relevant documents to the user.
193/// Here's a walkthrough of the recommendation procedure: collect 1k most recent document events (write/read), use that activity to construct a DocActivtyMetrics struct for each document. Min-max normalizes the activity features, then rank the documents.
194#[derive(Default, Copy, Clone, PartialEq)]
195pub struct DocActivityMetrics {
196    pub id: Uuid,
197    /// the latest epoch timestamp that the user read a document
198    pub last_read_timestamp: StatisticValue,
199    /// the latest epoch timestamp that the user wrote a document
200    pub last_write_timestamp: StatisticValue,
201    /// the total number of times that a user reads a document
202    pub read_count: StatisticValue,
203    /// the total number of times that a user wrote a document
204    pub write_count: StatisticValue,
205}
206
207impl DocActivityMetrics {
208    pub fn score(&self, weights: RankingWeights) -> i64 {
209        let timestamp_weight = weights.temporality;
210        let io_count_weight = weights.io;
211
212        let temporality_score = (self.last_read_timestamp.normalized.unwrap_or_default()
213            + self.last_write_timestamp.normalized.unwrap_or_default())
214            * timestamp_weight as f64;
215
216        let io_score = (self.read_count.normalized.unwrap_or_default()
217            + self.write_count.normalized.unwrap_or_default())
218            * io_count_weight as f64;
219
220        (io_score + temporality_score).ceil() as i64
221    }
222}
223pub trait Stats {
224    fn get_activity_metrics(self) -> Vec<DocActivityMetrics>;
225}
226impl<'a, T> Stats for T
227where
228    T: Iterator<Item = &'a DocEvent>,
229{
230    fn get_activity_metrics(self) -> Vec<DocActivityMetrics> {
231        let mut result = Vec::new();
232
233        let mut set = HashMap::new();
234        for event in self {
235            match set.get_mut(&event.id()) {
236                None => {
237                    set.insert(event.id(), vec![event]);
238                }
239                Some(events) => {
240                    events.push(event);
241                }
242            }
243        }
244
245        for (id, events) in set {
246            let read_events = events.iter().filter(|e| matches!(e, DocEvent::Read(_, _)));
247
248            let last_read = read_events
249                .clone()
250                .max_by(|x, y| x.timestamp().cmp(&y.timestamp()));
251
252            let last_read = match last_read {
253                None => 0,
254                Some(x) => x.timestamp(),
255            };
256
257            let write_events = events.iter().filter(|e| matches!(e, DocEvent::Write(_, _)));
258
259            let last_write = write_events
260                .clone()
261                .max_by(|x, y| x.timestamp().cmp(&y.timestamp()));
262            let last_write = match last_write {
263                None => 0,
264                Some(x) => x.timestamp(),
265            };
266
267            let metrics = DocActivityMetrics {
268                id,
269                last_read_timestamp: StatisticValue { raw: last_read, normalized: None },
270                last_write_timestamp: StatisticValue { raw: last_write, normalized: None },
271                read_count: StatisticValue { raw: read_events.count() as i64, normalized: None },
272                write_count: StatisticValue { raw: write_events.count() as i64, normalized: None },
273            };
274            result.push(metrics);
275        }
276
277        result
278    }
279}