use std::collections::HashMap;
use std::sync::Arc;
use log::{info, warn};
use crate::client::client_storage::client_storage::{self as cs, ClientStorage, BUCKET_META_POST_PUBLIC, BUCKET_CONFIG, BUCKET_CONFIG_KEY_META_POST_V1_PUBLIC, BUCKET_CONFIG_KEY_META_POST_V1_PRIVATE};
use crate::client::key_locker::key_locker::KeyLocker;
use crate::client::meta_post::meta_post::{MetaPost, MetaPostPrivateV1, MetaPostPublicV1, MetaPostV1, VersionedField, merge_public, merge_private};
use crate::client::meta_post::meta_post_crypto;
use crate::client::post_bundle::post_bundle_manager::PostBundleManager;
use crate::protocol::posting::encoded_post::EncodedPostV1;
use crate::tools::buckets::{bucket_durations_for_type, generate_bucket_location, BucketType};
use crate::tools::client_id::ClientId;
use crate::tools::json;
use crate::tools::runtime_services::RuntimeServices;
use crate::tools::time::TimeMillis;
use crate::tools::types::{Id, Salt};
pub struct MetaPostManager {
runtime_services: Arc<RuntimeServices>,
client_storage: Arc<dyn ClientStorage>,
key_locker: Arc<dyn KeyLocker>,
client_id: ClientId,
}
impl MetaPostManager {
pub fn new(
runtime_services: Arc<RuntimeServices>,
client_storage: Arc<dyn ClientStorage>,
key_locker: Arc<dyn KeyLocker>,
client_id: ClientId,
) -> Self {
Self { runtime_services, client_storage, key_locker, client_id }
}
fn now(&self) -> TimeMillis {
self.runtime_services.time_provider.current_time_millis()
}
fn public_config_key(&self) -> String {
cs::config_key_for_user(self.client_id.id, BUCKET_CONFIG_KEY_META_POST_V1_PUBLIC)
}
fn private_config_key(&self) -> String {
cs::config_key_for_user(self.client_id.id, BUCKET_CONFIG_KEY_META_POST_V1_PRIVATE)
}
pub async fn get_public(&self) -> anyhow::Result<MetaPostPublicV1> {
let public = cs::get_struct::<MetaPostPublicV1>(self.client_storage.as_ref(), BUCKET_CONFIG, &self.public_config_key(), TimeMillis::zero()).await?;
Ok(public.unwrap_or_else(MetaPostPublicV1::empty))
}
async fn put_public(&self, public: &MetaPostPublicV1) -> anyhow::Result<()> {
cs::put_struct(self.client_storage.as_ref(), BUCKET_CONFIG, &self.public_config_key(), public, TimeMillis::zero()).await
}
pub async fn get_private(&self) -> anyhow::Result<MetaPostPrivateV1> {
let private = cs::get_struct::<MetaPostPrivateV1>(self.client_storage.as_ref(), BUCKET_CONFIG, &self.private_config_key(), TimeMillis::zero()).await?;
Ok(private.unwrap_or_else(MetaPostPrivateV1::empty))
}
async fn put_private(&self, private: &MetaPostPrivateV1) -> anyhow::Result<()> {
cs::put_struct(self.client_storage.as_ref(), BUCKET_CONFIG, &self.private_config_key(), private, TimeMillis::zero()).await
}
pub async fn set_bio(&self, nickname: String, status: String, selfie: String, avatar: String) -> anyhow::Result<()> {
info!("set_bio: nickname={}, status={}, selfie={}, avatar={}", nickname, status, selfie, avatar);
let time_millis = self.now();
let local_public = self.get_public().await?;
let incoming_public = MetaPostPublicV1::from_bio(time_millis, nickname, status, selfie, avatar);
let merged_public = merge_public(&local_public, &incoming_public);
self.put_public(&merged_public).await?;
cs::put_struct(self.client_storage.as_ref(), BUCKET_META_POST_PUBLIC, &self.client_id.id.to_hex_str(), &merged_public, time_millis).await?;
Ok(())
}
pub async fn get_meta_post_public(&self, id: Id) -> anyhow::Result<Option<MetaPostPublicV1>> {
let meta_post_public = cs::get_struct::<MetaPostPublicV1>(self.client_storage.as_ref(), BUCKET_META_POST_PUBLIC, &id.to_hex_str(), self.now()).await?;
Ok(meta_post_public)
}
pub async fn get_all_meta_post_publics(&self) -> anyhow::Result<Vec<(String, MetaPostPublicV1)>> {
let mut meta_post_publics = Vec::new();
let keys = self.client_storage.keys(BUCKET_META_POST_PUBLIC).await?;
for key in keys {
let result: anyhow::Result<()> = try {
if let Some(meta_post_public) = cs::get_struct::<MetaPostPublicV1>(self.client_storage.as_ref(), BUCKET_META_POST_PUBLIC, &key, TimeMillis::zero()).await? {
meta_post_publics.push((key.clone(), meta_post_public));
}
};
if let Err(e) = result {
warn!("Failed to decode MetaPostPublicV1 with key {}: {}", key, e);
}
}
Ok(meta_post_publics)
}
pub async fn build_meta_post_json(&self) -> anyhow::Result<String> {
info!("build_meta_post_json");
let public = self.get_public().await?;
let private = self.get_private().await?;
let salt = Salt::random();
let private_encrypted = meta_post_crypto::encrypt_private_section(self.key_locker.as_ref(), &salt, &private).await?;
let meta_post = MetaPostV1::new(
self.client_id.id_hex(),
salt,
public.clone(),
private_encrypted,
);
cs::put_struct(self.client_storage.as_ref(), BUCKET_META_POST_PUBLIC, &self.client_id.id.to_hex_str(), &public, self.now()).await?;
json::struct_to_string(&meta_post)
}
pub async fn should_auto_publish(&self, post_bundle_manager: &dyn PostBundleManager) -> anyhow::Result<bool> {
let timestamp = self.now();
let bucket_durations = bucket_durations_for_type(BucketType::User);
let bucket_duration = bucket_durations.first().ok_or_else(|| anyhow::anyhow!("No bucket durations for User type"))?;
let bucket_location = generate_bucket_location(BucketType::User, self.client_id.id, *bucket_duration, timestamp)?;
let post_bundle = post_bundle_manager.get_post_bundle(&bucket_location, timestamp).await?;
if post_bundle.header.overflowed || post_bundle.header.sealed {
info!("Current bucket is full/sealed, skipping auto-publish of MetaPostV1");
return Ok(false);
}
let mut offset = 0;
for i in 0..(post_bundle.header.num_posts as usize) {
let len = post_bundle.header.encoded_post_lengths[i];
let post_bytes = post_bundle.encoded_posts_bytes.slice(offset..offset + len);
offset += len;
let decode_result = EncodedPostV1::decode_from_bytes(post_bytes, &bucket_location.base_id, true, true);
if let Ok(encoded_post) = decode_result {
if let Ok(post_client_id) = encoded_post.header.client_id() {
if post_client_id.id == self.client_id.id {
if let Ok(MetaPost::MetaPostV1(_)) = MetaPost::try_parse_meta_post(&encoded_post.post) {
info!("MetaPostV1 already exists in current bucket, skipping auto-publish");
return Ok(false);
}
}
}
}
}
info!("No MetaPostV1 found in most-recent, most-granular bucket, should auto-publish");
Ok(true)
}
pub async fn process_incoming_meta_post(&self, meta_post_v1: &MetaPostV1, post_client_id: &ClientId) -> anyhow::Result<()> {
let id = post_client_id.id.to_hex_str();
let now = self.now();
let incoming_timestamp = meta_post_v1.public.max_timestamp();
let existing_timestamp = cs::get_struct::<MetaPostPublicV1>(self.client_storage.as_ref(), BUCKET_META_POST_PUBLIC, &id, now).await?.map(|b| b.max_timestamp());
if existing_timestamp.is_none_or(|t| incoming_timestamp > t) {
cs::put_struct(self.client_storage.as_ref(), BUCKET_META_POST_PUBLIC, &id, &meta_post_v1.public, now).await?;
}
if post_client_id.id == self.client_id.id {
let local_public = self.get_public().await?;
let merged_public = merge_public(&local_public, &meta_post_v1.public);
self.put_public(&merged_public).await?;
match meta_post_crypto::decrypt_private_section(self.key_locker.as_ref(), &meta_post_v1.encryption_salt, &meta_post_v1.private_encrypted).await {
Ok(incoming_private) => {
let local_private = self.get_private().await?;
let merged_private = merge_private(&local_private, &incoming_private, now);
self.put_private(&merged_private).await?;
}
Err(e) => {
warn!("Failed to decrypt own MetaPostV1 private section: {}", e);
}
}
}
Ok(())
}
pub async fn get_followed_client_ids(&self) -> anyhow::Result<Vec<Id>> {
let private = self.get_private().await?;
let client_ids: Vec<Id> = private.followed_client_ids.iter()
.filter(|(_key, field)| field.value == Some(true))
.map(|(key, _field)| Id::from_hex_str(key))
.collect::<anyhow::Result<Vec<_>>>()?;
Ok(client_ids)
}
pub async fn set_followed_client_ids(&self, client_ids: Vec<Id>) -> anyhow::Result<()> {
let timestamp = self.now();
let mut private = self.get_private().await?;
for field in private.followed_client_ids.values_mut() {
if field.value == Some(true) {
*field = VersionedField::tombstone(timestamp);
}
}
for client_id in &client_ids {
private.followed_client_ids.insert(client_id.to_hex_str(), VersionedField::new(true, timestamp));
}
self.put_private(&private).await
}
pub async fn set_followed_client_id(&self, client_id: Id, is_followed: bool) -> anyhow::Result<()> {
let timestamp = self.now();
let mut private = self.get_private().await?;
let field = if is_followed {
VersionedField::new(true, timestamp)
} else {
VersionedField::tombstone(timestamp)
};
private.followed_client_ids.insert(client_id.to_hex_str(), field);
self.put_private(&private).await
}
pub async fn get_followed_hashtags(&self) -> anyhow::Result<Vec<String>> {
let private = self.get_private().await?;
let hashtags: Vec<String> = private.followed_hashtags.iter()
.filter(|(_key, field)| field.value == Some(true))
.map(|(key, _field)| key.clone())
.collect();
Ok(hashtags)
}
pub async fn set_followed_hashtags(&self, hashtags: Vec<String>) -> anyhow::Result<()> {
let timestamp = self.now();
let mut private = self.get_private().await?;
for field in private.followed_hashtags.values_mut() {
if field.value == Some(true) {
*field = VersionedField::tombstone(timestamp);
}
}
for hashtag in &hashtags {
private.followed_hashtags.insert(hashtag.clone(), VersionedField::new(true, timestamp));
}
self.put_private(&private).await
}
pub async fn set_followed_hashtag(&self, hashtag: String, is_followed: bool) -> anyhow::Result<()> {
let timestamp = self.now();
let mut private = self.get_private().await?;
let field = if is_followed {
VersionedField::new(true, timestamp)
} else {
VersionedField::tombstone(timestamp)
};
private.followed_hashtags.insert(hashtag, field);
self.put_private(&private).await
}
pub async fn get_content_thresholds(&self) -> anyhow::Result<HashMap<u8, u32>> {
let private = self.get_private().await?;
let thresholds: HashMap<u8, u32> = private.content_thresholds.iter()
.filter_map(|(key, field)| {
let threshold = field.value?;
Some((*key, threshold))
})
.collect();
Ok(thresholds)
}
pub async fn set_content_threshold(&self, feedback_type: u8, threshold: u32) -> anyhow::Result<()> {
let timestamp = self.now();
let mut private = self.get_private().await?;
private.content_thresholds.insert(feedback_type, VersionedField::new(threshold, timestamp));
self.put_private(&private).await
}
pub async fn set_content_thresholds(&self, thresholds: HashMap<u8, u32>) -> anyhow::Result<()> {
let timestamp = self.now();
let mut private = self.get_private().await?;
for (feedback_type, threshold) in thresholds {
private.content_thresholds.insert(feedback_type, VersionedField::new(threshold, timestamp));
}
self.put_private(&private).await
}
pub async fn get_skip_warnings_for_followed(&self) -> anyhow::Result<bool> {
let private = self.get_private().await?;
Ok(private.skip_warnings_for_followed.value.unwrap_or(false))
}
pub async fn set_skip_warnings_for_followed(&self, value: bool) -> anyhow::Result<()> {
let timestamp = self.now();
let mut private = self.get_private().await?;
private.skip_warnings_for_followed = VersionedField::new(value, timestamp);
self.put_private(&private).await
}
}