1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
//! Utility definitions to consturct feed generator
use std::{
  cmp::Ordering,
  collections::HashMap,
  fmt::Display,
  str::FromStr,
  sync::{Arc, RwLock},
};

use chrono::{DateTime, Utc};
use libipld::Cid;
use serde::{Deserialize, Serialize};

use crate::api::{AppBskyFeedDefsSkeletonfeedpost, AppBskyFeedGetfeedskeleton, AppBskyFeedPost};

/// Structure of PDS posts
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FeedPost {
  pub uri: String,
  pub cid: Cid,
  pub repo: String,
  pub indexed_at: DateTime<Utc>,
  pub post: AppBskyFeedPost,
  pub extra: HashMap<String, String>,
}

impl FeedPost {
  /// Create a record of PDS post
  pub fn new(uri: &str, cid: &str, repo: &str, post: &AppBskyFeedPost) -> Self {
    Self {
      uri: uri.to_string(),
      cid: Cid::from_str(cid).unwrap_or_default(),
      repo: repo.to_string(),
      indexed_at: Utc::now(),
      post: post.clone(),
      extra: HashMap::new(),
    }
  }

  /// Compare if the post is older than the cursor
  pub fn is_old(&self, cursor: &Cursor) -> bool {
    match self.indexed_at.cmp(&cursor.indexed_at) {
      Ordering::Less => true,
      Ordering::Equal => self.cid < cursor.cid,
      Ordering::Greater => false,
    }
  }

  /// Convert to cursor
  pub fn to_cursor(&self) -> Cursor {
    Cursor {
      indexed_at: self.indexed_at,
      cid: self.cid,
    }
  }

  /// Convert to FeedGenerator response
  pub fn to_response(&self) -> AppBskyFeedDefsSkeletonfeedpost {
    AppBskyFeedDefsSkeletonfeedpost {
      post: self.uri.clone(),
      ..Default::default()
    }
  }

  /// Insert extra value
  pub fn insert<T1: ToString, T2: ToString>(&mut self, key: T1, value: T2) {
    self.extra.insert(key.to_string(), value.to_string());
  }

  /// Get extra value
  pub fn get(&mut self, key: &str) -> Option<String> {
    self.extra.get(key).cloned()
  }
}

/// List of PDS posts
pub struct FeedPosts {
  pub posts: Arc<RwLock<Vec<FeedPost>>>,
}

impl Default for FeedPosts {
  fn default() -> Self {
    Self {
      posts: Arc::new(RwLock::new(Vec::new())),
    }
  }
}

impl Clone for FeedPosts {
  fn clone(&self) -> Self {
    Self {
      posts: Arc::clone(&self.posts),
    }
  }
}

impl From<Vec<FeedPost>> for FeedPosts {
  fn from(value: Vec<FeedPost>) -> Self {
    Self {
      posts: Arc::new(RwLock::new(value)),
    }
  }
}

impl FeedPosts {
  /// Append PDS posts to this list
  pub fn append_posts(&mut self, new_posts: &[FeedPost]) {
    let mut new_posts = new_posts.to_vec();
    let mut posts = self.posts.write().unwrap();
    posts.append(&mut new_posts);
  }

  /// Delete PDS posts from this list
  pub fn delete_posts(&self, uris: &[String]) {
    let mut posts = self.posts.write().unwrap();
    *posts = posts
      .iter()
      .filter(|p| !uris.contains(&p.uri))
      .cloned()
      .collect::<Vec<_>>();
  }

  /// Get all PDS posts of this list
  pub fn get_all_posts(&self) -> Vec<FeedPost> {
    let mut posts = self.posts.read().unwrap().clone();
    posts.sort_by(|a, b| match b.indexed_at.cmp(&a.indexed_at) {
      Ordering::Equal => b.cid.cmp(&a.cid),
      o => o,
    });
    posts
  }

  /// Get a fixed number of PDS posts
  pub fn get_old_posts(&self, limit: usize, cursor: Option<Cursor>) -> AppBskyFeedGetfeedskeleton {
    let posts = self.get_all_posts();
    let posts = match cursor {
      Some(c) => posts
        .into_iter()
        .filter(|p| p.is_old(&c))
        .collect::<Vec<_>>(),
      None => posts,
    };
    let feed = posts.iter().take(limit).cloned().collect::<Vec<_>>();
    AppBskyFeedGetfeedskeleton {
      feed: feed.iter().map(|p| p.to_response()).collect::<Vec<_>>(),
      cursor: (posts.last().map(|p| &p.uri) != feed.last().map(|p| &p.uri))
        .then(|| feed)
        .and_then(|f| f.last().map(|p| p.to_cursor().to_string())),
    }
  }
}

/// Cursor
pub struct Cursor {
  pub indexed_at: DateTime<Utc>,
  pub cid: Cid,
}

impl Display for Cursor {
  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    f.write_fmt(format_args!(
      "{}::{}",
      self.indexed_at.timestamp_millis(),
      self.cid.to_string()
    ))
  }
}

impl Cursor {
  /// Parse http request parameter as a cursor
  pub fn parse(cursor: &str) -> Option<Self> {
    let mut sp = cursor.split("::");
    let ts = sp.next()?.parse::<i64>().ok()?;
    let indexed_at = DateTime::from_timestamp(ts / 1000, (ts % 1000) as u32)?;
    let cid = Cid::from_str(sp.next()?).ok()?;
    Some(Self { indexed_at, cid })
  }

  /// Convert PDS post to cursor
  pub fn from_feedpost(post: &FeedPost) -> Option<Self> {
    Some(Self {
      indexed_at: post.indexed_at,
      cid: post.cid,
    })
  }
}