use std::sync::Arc;
use bytes::Bytes;
use crate::anyhow_assert_eq;
use crate::protocol::payload::payload::{
CachePostBundleFeedbackResponseV1, CachePostBundleFeedbackV1, CachePostBundleResponseV1,
CachePostBundleV1, CacheRequestTokenV1, PayloadRequestKind, PayloadResponseKind,
};
use crate::protocol::rpc;
use crate::tools::runtime_services::RuntimeServices;
use crate::tools::tools::spawn_background_task;
use crate::tools::types::Id;
use crate::tools::json;
use log::warn;
pub fn upload_post_bundle_caches(
runtime_services: Arc<RuntimeServices>,
sponsor_id: Id,
tokens: Vec<CacheRequestTokenV1>,
bundles: Vec<(Id, Bytes)>,
) {
if tokens.is_empty() || bundles.is_empty() {
return;
}
spawn_background_task(async move {
for token in &tokens {
let now = runtime_services.time_provider.current_time_millis();
if token.is_expired(now) {
continue;
}
let filtered: Vec<&[u8]> = bundles
.iter()
.filter(|(originator_id, _)| !token.already_cached_peer_ids.contains(originator_id))
.map(|(_, bytes)| bytes.as_ref())
.collect();
if filtered.is_empty() {
continue;
}
let result: anyhow::Result<()> = async {
let request_bytes = CachePostBundleV1::new_to_bytes(token, &filtered)?;
let response = rpc::rpc::rpc_server_known(
&runtime_services,
&sponsor_id,
&token.peer,
PayloadRequestKind::CachePostBundleV1,
request_bytes,
)
.await?;
anyhow_assert_eq!(&PayloadResponseKind::CachePostBundleResponseV1, &response.response_request_kind);
json::bytes_to_struct::<CachePostBundleResponseV1>(&response.bytes)?;
Ok(())
}
.await;
if let Err(e) = result {
warn!("Failed to upload post bundle cache to {}: {}", token.peer, e);
}
}
});
}
pub fn upload_post_bundle_feedback_caches(
runtime_services: Arc<RuntimeServices>,
sponsor_id: Id,
tokens: Vec<CacheRequestTokenV1>,
feedback_bytes: Bytes,
) {
if tokens.is_empty() || feedback_bytes.is_empty() {
return;
}
spawn_background_task(async move {
for token in &tokens {
let now = runtime_services.time_provider.current_time_millis();
if token.is_expired(now) {
continue;
}
{
let result: anyhow::Result<()> = async {
let request_bytes = CachePostBundleFeedbackV1::new_to_bytes(token, &feedback_bytes)?;
let response = rpc::rpc::rpc_server_known(
&runtime_services,
&sponsor_id,
&token.peer,
PayloadRequestKind::CachePostBundleFeedbackV1,
request_bytes,
)
.await?;
anyhow_assert_eq!(&PayloadResponseKind::CachePostBundleFeedbackResponseV1, &response.response_request_kind);
json::bytes_to_struct::<CachePostBundleFeedbackResponseV1>(&response.bytes)?;
Ok(())
}
.await;
if let Err(e) = result {
warn!("Failed to upload post bundle feedback cache to {}: {}", token.peer, e);
}
}
}
});
}