use crate::{
BoxFut, DynOpStore, DynReport, K2Result, OpId, SpaceId, Url, builder,
config, transport::DynTransport,
};
use crate::{DynPeerMetaStore, op_store};
use bytes::{Bytes, BytesMut};
use prost::Message;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
pub(crate) mod proto {
include!("../proto/gen/kitsune2.fetch.rs");
}
pub use proto::{
FetchRequest, FetchResponse, K2FetchMessage, k2_fetch_message::*,
};
impl From<Vec<OpId>> for FetchRequest {
fn from(value: Vec<OpId>) -> Self {
Self {
op_ids: value.into_iter().map(Into::into).collect(),
}
}
}
impl From<FetchRequest> for Vec<OpId> {
fn from(value: FetchRequest) -> Self {
value.op_ids.into_iter().map(Into::into).collect()
}
}
impl From<Vec<Bytes>> for FetchResponse {
fn from(value: Vec<Bytes>) -> Self {
Self {
ops: value.into_iter().map(Into::into).collect(),
}
}
}
pub fn serialize_request(value: Vec<OpId>) -> Bytes {
let mut out = BytesMut::new();
FetchRequest::from(value)
.encode(&mut out)
.expect("failed to encode request");
out.freeze()
}
pub fn serialize_request_message(value: Vec<OpId>) -> Bytes {
let mut out = BytesMut::new();
let data = serialize_request(value);
let fetch_message = K2FetchMessage {
fetch_message_type: FetchMessageType::Request.into(),
data,
};
fetch_message
.encode(&mut out)
.expect("failed to encode fetch request message");
out.freeze()
}
pub fn serialize_response(value: Vec<Bytes>) -> Bytes {
let mut out = BytesMut::new();
FetchResponse::from(value)
.encode(&mut out)
.expect("failed to encode response");
out.freeze()
}
pub fn serialize_response_message(value: Vec<Bytes>) -> Bytes {
let mut out = BytesMut::new();
let data = serialize_response(value);
let fetch_message = K2FetchMessage {
fetch_message_type: FetchMessageType::Response.into(),
data,
};
fetch_message
.encode(&mut out)
.expect("failed to encode fetch response message");
out.freeze()
}
pub trait Fetch: 'static + Send + Sync + std::fmt::Debug {
fn request_ops(
&self,
op_ids: Vec<OpId>,
source: Url,
) -> BoxFut<'_, K2Result<()>>;
fn notify_on_drained(&self, notify: futures::channel::oneshot::Sender<()>);
fn get_state_summary(&self) -> BoxFut<'_, K2Result<FetchStateSummary>>;
}
pub type DynFetch = Arc<dyn Fetch>;
pub trait FetchFactory: 'static + Send + Sync + std::fmt::Debug {
fn default_config(&self, config: &mut config::Config) -> K2Result<()>;
fn validate_config(&self, config: &config::Config) -> K2Result<()>;
fn create(
&self,
builder: Arc<builder::Builder>,
space_id: SpaceId,
report: DynReport,
op_store: DynOpStore,
peer_meta_store: DynPeerMetaStore,
transport: DynTransport,
) -> BoxFut<'static, K2Result<DynFetch>>;
}
pub type DynFetchFactory = Arc<dyn FetchFactory>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FetchStateSummary {
pub pending_requests: HashMap<OpId, Vec<Url>>,
}
#[cfg(test)]
mod test {
use super::*;
use crate::id::Id;
use prost::Message;
#[test]
fn happy_request_encode_decode() {
let op_id_1 = OpId::from(Bytes::from_static(b"some_op_id"));
let op_id_2 = OpId::from(Bytes::from_static(b"another_op_id"));
let op_id_vec = vec![op_id_1, op_id_2];
let op_ids = FetchRequest::from(op_id_vec.clone());
let op_ids_enc = op_ids.encode_to_vec();
let op_ids_dec = FetchRequest::decode(op_ids_enc.as_slice()).unwrap();
let op_ids_dec_vec = Vec::from(op_ids_dec.clone());
assert_eq!(op_ids, op_ids_dec);
assert_eq!(op_id_vec, op_ids_dec_vec);
}
#[test]
fn happy_response_encode_decode() {
let op_1 = bytes::Bytes::from(vec![0]);
let op_2 = bytes::Bytes::from(vec![1]);
let expected_ops_data = vec![op_1, op_2];
let ops_enc = serialize_response(expected_ops_data.clone());
let response = FetchResponse::decode(ops_enc).unwrap();
let actual_ops_data = response
.ops
.into_iter()
.map(|op| op.data)
.collect::<Vec<_>>();
assert_eq!(expected_ops_data, actual_ops_data);
}
#[test]
fn happy_fetch_request_encode_decode() {
let op_id = OpId(Id(bytes::Bytes::from_static(b"id_1")));
let op_ids = vec![op_id];
let fetch_request = serialize_request_message(op_ids.clone());
let fetch_message_dec = K2FetchMessage::decode(fetch_request).unwrap();
assert_eq!(
fetch_message_dec.fetch_message_type,
i32::from(FetchMessageType::Request)
);
let request_dec = FetchRequest::decode(fetch_message_dec.data).unwrap();
let op_ids_dec = request_dec
.op_ids
.into_iter()
.map(Into::<OpId>::into)
.collect::<Vec<_>>();
assert_eq!(op_ids, op_ids_dec);
}
#[test]
fn happy_fetch_response_encode_decode() {
let op = bytes::Bytes::from(vec![0]);
let expected_ops_data = vec![op];
let fetch_response =
serialize_response_message(expected_ops_data.clone());
let fetch_message_dec = K2FetchMessage::decode(fetch_response).unwrap();
assert_eq!(
fetch_message_dec.fetch_message_type,
i32::from(FetchMessageType::Response)
);
let response_dec =
FetchResponse::decode(fetch_message_dec.data).unwrap();
let actual_ops_data = response_dec
.ops
.into_iter()
.map(|op| op.data)
.collect::<Vec<_>>();
assert_eq!(expected_ops_data, actual_ops_data);
}
}