#![allow(dead_code)]
pub(crate) mod op_ctx_task;
use freenet_stdlib::client_api::{ErrorKind, HostResponse};
use freenet_stdlib::prelude::*;
use std::collections::HashSet;
use std::fmt::Display;
use std::time::Instant;
use crate::client_events::HostResult;
use crate::node::IsOperationCompleted;
use crate::{
contract::StoreResponse,
message::{InnerMessage, Transaction},
operations::{OpError, OpOutcome},
ring::{Location, PeerKeyLocation},
};
pub(crate) use self::messages::{GetMsg, GetMsgResult, GetStreamingPayload};
const MAX_RETRIES: usize = 10;
const DEFAULT_MAX_BREADTH: usize = 3;
const MIN_RETRY_HTL: usize = 3;
#[derive(Debug)]
struct PrepareRequestData {
instance_id: ContractInstanceId,
id: Transaction,
fetch_contract: bool,
subscribe: bool,
blocking_subscribe: bool,
}
impl PrepareRequestData {
#[allow(dead_code)] fn into_awaiting_response(
self,
requester: Option<PeerKeyLocation>,
next_hop: PeerKeyLocation,
alternatives: Vec<PeerKeyLocation>,
visited: super::VisitedPeers,
) -> AwaitingResponseData {
AwaitingResponseData {
instance_id: self.instance_id,
requester,
fetch_contract: self.fetch_contract,
retries: 0,
current_hop: 0,
subscribe: self.subscribe,
blocking_subscribe: self.blocking_subscribe,
next_hop,
tried_peers: HashSet::new(),
alternatives,
attempts_at_hop: 0,
visited,
}
}
}
#[derive(Debug)]
struct AwaitingResponseData {
instance_id: ContractInstanceId,
requester: Option<PeerKeyLocation>,
fetch_contract: bool,
retries: usize,
current_hop: usize,
subscribe: bool,
blocking_subscribe: bool,
#[allow(dead_code)]
next_hop: PeerKeyLocation,
tried_peers: HashSet<std::net::SocketAddr>,
alternatives: Vec<PeerKeyLocation>,
attempts_at_hop: usize,
visited: super::VisitedPeers,
}
impl AwaitingResponseData {
#[allow(dead_code)] fn into_finished(self, key: ContractKey) -> FinishedData {
FinishedData { key }
}
}
#[derive(Debug, Clone, Copy)]
struct FinishedData {
key: ContractKey,
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum GetState {
ReceivedRequest,
PrepareRequest(PrepareRequestData),
AwaitingResponse(AwaitingResponseData),
Finished(FinishedData),
}
impl Display for GetState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
GetState::ReceivedRequest => write!(f, "ReceivedRequest"),
GetState::PrepareRequest(data) => {
write!(
f,
"PrepareRequest(instance_id: {}, id: {}, fetch_contract: {}, subscribe: {})",
data.instance_id, data.id, data.fetch_contract, data.subscribe
)
}
GetState::AwaitingResponse(data) => {
write!(
f,
"AwaitingResponse(requester: {:?}, fetch_contract: {}, retries: {}, current_hop: {}, subscribe: {})",
data.requester,
data.fetch_contract,
data.retries,
data.current_hop,
data.subscribe
)
}
GetState::Finished(data) => write!(f, "Finished(key: {})", data.key),
}
}
}
struct GetStats {
next_peer: Option<PeerKeyLocation>,
contract_location: Location,
first_response_time: Option<(Instant, Option<Instant>)>,
transfer_time: Option<(Instant, Option<Instant>)>,
}
impl GetStats {
fn start_timers(&mut self) {
let now = Instant::now();
self.first_response_time = Some((now, None));
self.transfer_time = Some((now, None));
}
fn record_response_end(&mut self) {
if let Some((_, ref mut end)) = self.first_response_time {
*end = Some(Instant::now());
}
}
fn record_transfer_end(&mut self) {
if let Some((_, ref mut end)) = self.transfer_time {
*end = Some(Instant::now());
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct GetResult {
key: ContractKey,
pub state: WrappedState,
pub contract: Option<ContractContainer>,
}
impl GetResult {
pub(crate) fn new(
key: ContractKey,
state: WrappedState,
contract: Option<ContractContainer>,
) -> Self {
Self {
key,
state,
contract,
}
}
}
impl TryFrom<GetOp> for GetResult {
type Error = OpError;
fn try_from(value: GetOp) -> Result<Self, Self::Error> {
match value.result {
Some(r) => Ok(r),
_ => Err(OpError::UnexpectedOpState),
}
}
}
pub(crate) struct GetOp {
pub id: Transaction,
state: Option<GetState>,
pub(super) result: Option<GetResult>,
stats: Option<Box<GetStats>>,
upstream_addr: Option<std::net::SocketAddr>,
local_fallback: Option<(ContractKey, WrappedState, Option<ContractContainer>)>,
auto_fetch: bool,
client_return_code: bool,
}
impl GetOp {
pub(super) fn outcome(&self) -> OpOutcome<'_> {
if let Some((
GetResult {
state, contract, ..
},
GetStats {
next_peer: Some(target_peer),
contract_location,
first_response_time: Some((response_start, Some(response_end))),
transfer_time: Some((transfer_start, Some(transfer_end))),
..
},
)) = self.result.as_ref().zip(self.stats.as_deref())
{
let payload_size = state.size()
+ contract
.as_ref()
.map(|c| c.data().len())
.unwrap_or_default();
OpOutcome::ContractOpSuccess {
target_peer,
contract_location: *contract_location,
payload_size,
first_response_time: *response_end - *response_start,
payload_transfer_time: *transfer_end - *transfer_start,
}
} else if self.result.is_none() {
if let Some(GetStats {
next_peer: Some(target_peer),
contract_location,
..
}) = self.stats.as_deref()
{
OpOutcome::ContractOpFailure {
target_peer,
contract_location: *contract_location,
}
} else {
OpOutcome::Incomplete
}
} else if let Some(GetStats {
next_peer: Some(target_peer),
contract_location,
..
}) = self.stats.as_deref()
{
OpOutcome::ContractOpSuccessUntimed {
target_peer,
contract_location: *contract_location,
}
} else {
OpOutcome::Incomplete
}
}
pub(crate) fn instance_id(&self) -> Option<ContractInstanceId> {
match &self.state {
Some(GetState::PrepareRequest(data)) => Some(data.instance_id),
Some(GetState::AwaitingResponse(data)) => Some(data.instance_id),
_ => None,
}
}
pub(crate) fn is_client_initiated(&self) -> bool {
if self.auto_fetch {
return false;
}
match &self.state {
Some(GetState::PrepareRequest(_)) => true,
Some(GetState::AwaitingResponse(data)) => data.requester.is_none(),
Some(GetState::ReceivedRequest) | Some(GetState::Finished(_)) | None => false,
}
}
pub(crate) fn failure_routing_info(&self) -> Option<(PeerKeyLocation, Location)> {
self.stats.as_deref().and_then(|stats| {
stats
.next_peer
.as_ref()
.map(|peer| (peer.clone(), stats.contract_location))
})
}
pub(super) fn finalized(&self) -> bool {
self.result.is_some() && matches!(self.state, Some(GetState::Finished(_)))
}
pub(super) fn to_host_result(&self) -> HostResult {
match &self.result {
Some(GetResult {
key,
state,
contract,
}) => {
let client_contract = if self.client_return_code {
contract.clone()
} else {
None
};
Ok(HostResponse::ContractResponse(
freenet_stdlib::client_api::ContractResponse::GetResponse {
key: *key,
contract: client_contract,
state: state.clone(),
},
))
}
None => Err(ErrorKind::OperationError {
cause: "get didn't finish successfully".into(),
}
.into()),
}
}
pub(crate) fn get_next_hop_addr(&self) -> Option<std::net::SocketAddr> {
match &self.state {
Some(GetState::AwaitingResponse(data)) => data.next_hop.socket_addr(),
_ => None,
}
}
pub(crate) fn get_current_hop(&self) -> Option<usize> {
match &self.state {
Some(GetState::AwaitingResponse(data)) => Some(data.current_hop),
_ => None,
}
}
}
impl IsOperationCompleted for GetOp {
fn is_completed(&self) -> bool {
matches!(self.state, Some(GetState::Finished(_)))
}
}
mod messages {
use std::fmt::Display;
use serde::{Deserialize, Serialize};
use super::*;
use crate::transport::peer_connection::StreamId;
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct GetStreamingPayload {
pub key: ContractKey,
pub value: StoreResponse,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub(crate) enum GetMsgResult {
Found {
key: ContractKey,
value: StoreResponse,
},
NotFound,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub(crate) enum GetMsg {
Request {
id: Transaction,
instance_id: ContractInstanceId,
fetch_contract: bool,
htl: usize,
visited: super::super::VisitedPeers,
#[serde(default)]
subscribe: bool,
},
Response {
id: Transaction,
instance_id: ContractInstanceId,
result: GetMsgResult,
},
ResponseStreaming {
id: Transaction,
instance_id: ContractInstanceId,
stream_id: StreamId,
key: ContractKey,
total_size: u64,
includes_contract: bool,
},
ResponseStreamingAck {
id: Transaction,
stream_id: StreamId,
},
ForwardingAck {
id: Transaction,
instance_id: ContractInstanceId,
},
}
impl InnerMessage for GetMsg {
fn id(&self) -> &Transaction {
match self {
Self::Request { id, .. }
| Self::Response { id, .. }
| Self::ResponseStreaming { id, .. }
| Self::ResponseStreamingAck { id, .. }
| Self::ForwardingAck { id, .. } => id,
}
}
fn requested_location(&self) -> Option<Location> {
match self {
Self::Request { instance_id, .. }
| Self::Response { instance_id, .. }
| Self::ResponseStreaming { instance_id, .. }
| Self::ForwardingAck { instance_id, .. } => Some(Location::from(instance_id)),
Self::ResponseStreamingAck { .. } => {
None
}
}
}
}
impl Display for GetMsg {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let id = self.id();
match self {
Self::Request {
instance_id, htl, ..
} => {
write!(
f,
"Get::Request(id: {id}, instance_id: {instance_id}, htl: {htl})"
)
}
Self::Response {
instance_id,
result,
..
} => {
let result_str = match result {
GetMsgResult::Found { key, .. } => format!("Found({key})"),
GetMsgResult::NotFound => "NotFound".to_string(),
};
write!(
f,
"Get::Response(id: {id}, instance_id: {instance_id}, result: {result_str})"
)
}
Self::ResponseStreaming {
instance_id,
stream_id,
key,
total_size,
..
} => {
write!(
f,
"Get::ResponseStreaming(id: {id}, instance_id: {instance_id}, key: {key}, stream: {stream_id}, size: {total_size})"
)
}
Self::ResponseStreamingAck { stream_id, .. } => {
write!(
f,
"Get::ResponseStreamingAck(id: {id}, stream: {stream_id})"
)
}
Self::ForwardingAck { instance_id, .. } => {
write!(
f,
"Get::ForwardingAck(id: {id}, instance_id: {instance_id})"
)
}
}
}
}
}
#[cfg(test)]
#[allow(clippy::wildcard_enum_match_arm)]
mod tests {
use super::*;
use crate::message::Transaction;
use crate::operations::VisitedPeers;
use crate::operations::test_utils::{make_contract_key, make_peer, make_test_contract};
fn make_get_op(state: Option<GetState>, result: Option<GetResult>) -> GetOp {
GetOp {
id: Transaction::new::<GetMsg>(),
state,
result,
stats: None,
upstream_addr: None,
local_fallback: None,
auto_fetch: false,
client_return_code: true,
}
}
#[test]
fn get_op_finalized_when_finished_with_result() {
let key = make_contract_key(1);
let result = GetResult {
key,
state: WrappedState::new(vec![1, 2, 3]),
contract: None,
};
let op = make_get_op(Some(GetState::Finished(FinishedData { key })), Some(result));
assert!(
op.finalized(),
"GetOp should be finalized when state is Finished and result is present"
);
}
#[test]
fn get_op_not_finalized_when_finished_without_result() {
let key = make_contract_key(1);
let op = make_get_op(Some(GetState::Finished(FinishedData { key })), None);
assert!(
!op.finalized(),
"GetOp should not be finalized when state is Finished but result is None"
);
}
#[test]
fn get_op_not_finalized_when_received_request() {
let op = make_get_op(Some(GetState::ReceivedRequest), None);
assert!(
!op.finalized(),
"GetOp should not be finalized in ReceivedRequest state"
);
}
#[test]
fn get_op_not_finalized_when_state_is_none() {
let op = make_get_op(None, None);
assert!(
!op.finalized(),
"GetOp should not be finalized when state is None"
);
}
#[test]
fn get_op_to_host_result_success_when_result_present() {
let key = make_contract_key(1);
let state_data = WrappedState::new(vec![1, 2, 3]);
let result = GetResult {
key,
state: state_data.clone(),
contract: None,
};
let op = make_get_op(Some(GetState::Finished(FinishedData { key })), Some(result));
let host_result = op.to_host_result();
assert!(
host_result.is_ok(),
"to_host_result should return Ok when result is present"
);
if let Ok(HostResponse::ContractResponse(
freenet_stdlib::client_api::ContractResponse::GetResponse {
key: returned_key,
state: returned_state,
..
},
)) = host_result
{
assert_eq!(returned_key, key, "Returned key should match");
assert_eq!(returned_state, state_data, "Returned state should match");
} else {
panic!("Expected GetResponse");
}
}
#[test]
fn get_op_to_host_result_error_when_no_result() {
let op = make_get_op(Some(GetState::ReceivedRequest), None);
let result = op.to_host_result();
assert!(
result.is_err(),
"to_host_result should return Err when result is None"
);
}
#[test]
fn get_op_to_host_result_strips_contract_when_client_return_code_false() {
let key = make_contract_key(1);
let result = GetResult {
key,
state: WrappedState::new(vec![1, 2, 3]),
contract: Some(make_test_contract(&[42u8; 100])),
};
let mut op = make_get_op(Some(GetState::Finished(FinishedData { key })), Some(result));
fn get_response_contract(op: &GetOp) -> Option<ContractContainer> {
let Ok(HostResponse::ContractResponse(
freenet_stdlib::client_api::ContractResponse::GetResponse { contract, .. },
)) = op.to_host_result()
else {
panic!("Expected Ok(GetResponse)");
};
contract
}
op.client_return_code = true;
assert!(
get_response_contract(&op).is_some(),
"Contract should be included when client_return_code=true"
);
op.client_return_code = false;
assert!(
get_response_contract(&op).is_none(),
"Contract should be stripped when client_return_code=false"
);
}
#[test]
fn get_op_outcome_incomplete_without_stats() {
let key = make_contract_key(1);
let result = GetResult {
key,
state: WrappedState::new(vec![]),
contract: None,
};
let op = make_get_op(Some(GetState::Finished(FinishedData { key })), Some(result));
let outcome = op.outcome();
assert!(matches!(outcome, OpOutcome::Incomplete));
}
#[test]
fn get_msg_id_returns_transaction() {
let tx = Transaction::new::<GetMsg>();
let msg = GetMsg::Request {
id: tx,
instance_id: *make_contract_key(1).id(),
fetch_contract: false,
htl: 5,
visited: VisitedPeers::new(&tx),
subscribe: false,
};
assert_eq!(*msg.id(), tx, "id() should return the transaction ID");
}
#[test]
fn get_msg_display_formats_correctly() {
let tx = Transaction::new::<GetMsg>();
let msg = GetMsg::Request {
id: tx,
instance_id: *make_contract_key(1).id(),
fetch_contract: false,
htl: 5,
visited: VisitedPeers::new(&tx),
subscribe: false,
};
let display = format!("{}", msg);
assert!(
display.contains("Request"),
"Display should contain message type name"
);
}
#[test]
fn get_state_display_received_request() {
let state = GetState::ReceivedRequest;
let display = format!("{}", state);
assert!(
display.contains("ReceivedRequest"),
"Display should contain state name"
);
}
#[test]
fn get_state_display_finished() {
let state = GetState::Finished(FinishedData {
key: make_contract_key(1),
});
let display = format!("{}", state);
assert!(
display.contains("Finished"),
"Display should contain state name"
);
}
#[test]
fn get_msg_response_found_display_formats_correctly() {
let tx = Transaction::new::<GetMsg>();
let key = make_contract_key(1);
let msg = GetMsg::Response {
id: tx,
instance_id: *key.id(),
result: GetMsgResult::Found {
key,
value: StoreResponse {
state: Some(WrappedState::new(vec![1, 2, 3])),
contract: None,
},
},
};
let display = format!("{}", msg);
assert!(
display.contains("Response"),
"Display should contain message type name"
);
assert!(
display.contains("Found"),
"Display should indicate Found result"
);
}
#[test]
fn get_msg_response_notfound_display_formats_correctly() {
let tx = Transaction::new::<GetMsg>();
let instance_id = *make_contract_key(1).id();
let msg = GetMsg::Response {
id: tx,
instance_id,
result: GetMsgResult::NotFound,
};
let display = format!("{}", msg);
assert!(
display.contains("Response"),
"Display should contain message type name"
);
assert!(
display.contains("NotFound"),
"Display should indicate NotFound result"
);
}
#[test]
fn get_msg_result_found_contains_key_and_value() {
let key = make_contract_key(1);
let state = WrappedState::new(vec![1, 2, 3]);
let result = GetMsgResult::Found {
key,
value: StoreResponse {
state: Some(state.clone()),
contract: None,
},
};
if let GetMsgResult::Found {
key: found_key,
value,
} = result
{
assert_eq!(found_key, key);
assert_eq!(value.state, Some(state));
} else {
panic!("Expected Found variant");
}
}
#[test]
fn get_msg_result_notfound_is_unit_variant() {
let result = GetMsgResult::NotFound;
assert!(
matches!(result, GetMsgResult::NotFound),
"NotFound should match NotFound"
);
}
#[test]
fn get_msg_response_requested_location_uses_instance_id() {
let tx = Transaction::new::<GetMsg>();
let key = make_contract_key(1);
let instance_id = *key.id();
let msg_found = GetMsg::Response {
id: tx,
instance_id,
result: GetMsgResult::Found {
key,
value: StoreResponse {
state: Some(WrappedState::new(vec![])),
contract: None,
},
},
};
let location_found = msg_found.requested_location();
assert!(
location_found.is_some(),
"Response should have a requested location"
);
assert_eq!(
location_found.unwrap(),
Location::from(&instance_id),
"Location should be derived from instance_id"
);
let msg_notfound = GetMsg::Response {
id: tx,
instance_id,
result: GetMsgResult::NotFound,
};
let location_notfound = msg_notfound.requested_location();
assert!(
location_notfound.is_some(),
"NotFound Response should have a requested location"
);
assert_eq!(
location_notfound.unwrap(),
Location::from(&instance_id),
"Location should be derived from instance_id for NotFound too"
);
}
#[test]
fn test_failure_outcome_for_get() {
use crate::ring::{Location, PeerKeyLocation};
use std::time::Duration;
let target_peer = PeerKeyLocation::random();
let contract_location = Location::random();
let op = GetOp {
id: Transaction::new::<GetMsg>(),
state: None,
result: None,
stats: Some(Box::new(GetStats {
next_peer: Some(target_peer.clone()),
contract_location,
first_response_time: None,
transfer_time: None,
})),
upstream_addr: None,
local_fallback: None,
auto_fetch: false,
client_return_code: true,
};
match op.outcome() {
OpOutcome::ContractOpFailure {
target_peer: peer,
contract_location: loc,
} => {
assert_eq!(*peer, target_peer);
assert_eq!(loc, contract_location);
}
OpOutcome::ContractOpSuccess { .. }
| OpOutcome::ContractOpSuccessUntimed { .. }
| OpOutcome::Incomplete
| OpOutcome::Irrelevant => panic!("Expected ContractOpFailure"),
}
let op_no_stats = GetOp {
id: Transaction::new::<GetMsg>(),
state: None,
result: None,
stats: None,
upstream_addr: None,
local_fallback: None,
auto_fetch: false,
client_return_code: true,
};
assert!(
matches!(op_no_stats.outcome(), OpOutcome::Incomplete),
"GetOp with no stats should return Incomplete"
);
let now = Instant::now();
let later = now + Duration::from_millis(50);
let op_success = GetOp {
id: Transaction::new::<GetMsg>(),
state: None,
result: Some(GetResult {
key: make_contract_key(1),
state: WrappedState::new(vec![1, 2, 3]),
contract: None,
}),
stats: Some(Box::new(GetStats {
next_peer: Some(target_peer.clone()),
contract_location,
first_response_time: Some((now, Some(later))),
transfer_time: Some((now, Some(later))),
})),
upstream_addr: None,
local_fallback: None,
auto_fetch: false,
client_return_code: true,
};
assert!(
matches!(op_success.outcome(), OpOutcome::ContractOpSuccess { .. }),
"GetOp with result and complete stats should return ContractOpSuccess"
);
}
#[test]
fn test_get_outcome_success_untimed_partial_timing() {
use crate::ring::{Location, PeerKeyLocation};
let target_peer = PeerKeyLocation::random();
let contract_location = Location::random();
let now = Instant::now();
let op = GetOp {
id: Transaction::new::<GetMsg>(),
state: Some(GetState::Finished(FinishedData {
key: make_contract_key(1),
})),
result: Some(GetResult {
key: make_contract_key(1),
state: WrappedState::new(vec![1, 2, 3]),
contract: None,
}),
stats: Some(Box::new(GetStats {
next_peer: Some(target_peer.clone()),
contract_location,
first_response_time: Some((now, None)), transfer_time: Some((now, None)), })),
upstream_addr: None,
local_fallback: None,
auto_fetch: false,
client_return_code: true,
};
match op.outcome() {
OpOutcome::ContractOpSuccessUntimed {
target_peer: peer,
contract_location: loc,
} => {
assert_eq!(*peer, target_peer);
assert_eq!(loc, contract_location);
}
other @ OpOutcome::ContractOpSuccess { .. }
| other @ OpOutcome::ContractOpFailure { .. }
| other @ OpOutcome::Incomplete
| other @ OpOutcome::Irrelevant => {
panic!("Expected ContractOpSuccessUntimed, got {other:?}")
}
}
}
#[test]
fn test_get_outcome_success_untimed_no_transfer_time() {
use crate::ring::{Location, PeerKeyLocation};
let target_peer = PeerKeyLocation::random();
let contract_location = Location::random();
let op = GetOp {
id: Transaction::new::<GetMsg>(),
state: Some(GetState::Finished(FinishedData {
key: make_contract_key(1),
})),
result: Some(GetResult {
key: make_contract_key(1),
state: WrappedState::new(vec![1, 2, 3]),
contract: None,
}),
stats: Some(Box::new(GetStats {
next_peer: Some(target_peer.clone()),
contract_location,
first_response_time: None,
transfer_time: None,
})),
upstream_addr: None,
local_fallback: None,
auto_fetch: false,
client_return_code: true,
};
match op.outcome() {
OpOutcome::ContractOpSuccessUntimed {
target_peer: peer,
contract_location: loc,
} => {
assert_eq!(*peer, target_peer);
assert_eq!(loc, contract_location);
}
other @ OpOutcome::ContractOpSuccess { .. }
| other @ OpOutcome::ContractOpFailure { .. }
| other @ OpOutcome::Incomplete
| other @ OpOutcome::Irrelevant => {
panic!("Expected ContractOpSuccessUntimed, got {other:?}")
}
}
}
#[test]
fn test_get_outcome_incomplete_result_no_peer() {
let op = GetOp {
id: Transaction::new::<GetMsg>(),
state: Some(GetState::Finished(FinishedData {
key: make_contract_key(1),
})),
result: Some(GetResult {
key: make_contract_key(1),
state: WrappedState::new(vec![1, 2, 3]),
contract: None,
}),
stats: Some(Box::new(GetStats {
next_peer: None,
contract_location: Location::random(),
first_response_time: None,
transfer_time: None,
})),
upstream_addr: None,
local_fallback: None,
auto_fetch: false,
client_return_code: true,
};
assert!(
matches!(op.outcome(), OpOutcome::Incomplete),
"Result with no target_peer should return Incomplete"
);
}
#[test]
fn test_get_outcome_failure_no_result_partial_timing() {
use crate::ring::{Location, PeerKeyLocation};
let target_peer = PeerKeyLocation::random();
let contract_location = Location::random();
let now = Instant::now();
let op = GetOp {
id: Transaction::new::<GetMsg>(),
state: None,
result: None,
stats: Some(Box::new(GetStats {
next_peer: Some(target_peer.clone()),
contract_location,
first_response_time: Some((now, None)),
transfer_time: None,
})),
upstream_addr: None,
local_fallback: None,
auto_fetch: false,
client_return_code: true,
};
match op.outcome() {
OpOutcome::ContractOpFailure {
target_peer: peer,
contract_location: loc,
} => {
assert_eq!(*peer, target_peer);
assert_eq!(loc, contract_location);
}
other @ OpOutcome::ContractOpSuccess { .. }
| other @ OpOutcome::ContractOpSuccessUntimed { .. }
| other @ OpOutcome::Incomplete
| other @ OpOutcome::Irrelevant => panic!("Expected ContractOpFailure, got {other:?}"),
}
}
#[test]
fn is_client_initiated_true_for_prepare_request() {
let op = make_get_op(
Some(GetState::PrepareRequest(PrepareRequestData {
instance_id: ContractInstanceId::new([1u8; 32]),
id: Transaction::new::<GetMsg>(),
fetch_contract: true,
subscribe: false,
blocking_subscribe: false,
})),
None,
);
assert!(op.is_client_initiated());
}
#[test]
fn is_client_initiated_false_for_awaiting_with_requester() {
let id = Transaction::new::<GetMsg>();
let instance_id = ContractInstanceId::new([42u8; 32]);
let visited = VisitedPeers::new(&id);
let requester = make_peer(6001);
let op = GetOp {
id,
state: Some(GetState::AwaitingResponse(AwaitingResponseData {
instance_id,
retries: 0,
fetch_contract: true,
requester: Some(requester),
current_hop: 7,
subscribe: false,
blocking_subscribe: false,
next_hop: make_peer(6002),
tried_peers: HashSet::new(),
alternatives: vec![],
attempts_at_hop: 1,
visited,
})),
result: None,
stats: None,
upstream_addr: None,
local_fallback: None,
auto_fetch: false,
client_return_code: true,
};
assert!(!op.is_client_initiated());
}
#[test]
fn is_client_initiated_false_for_auto_fetch() {
let id = Transaction::new::<GetMsg>();
let instance_id = ContractInstanceId::new([77u8; 32]);
let target = make_peer(7001);
let visited = VisitedPeers::new(&id);
let mut tried_peers = HashSet::new();
if let Some(addr) = target.socket_addr() {
tried_peers.insert(addr);
}
let op = GetOp {
id,
state: Some(GetState::AwaitingResponse(AwaitingResponseData {
instance_id,
retries: 0,
fetch_contract: true,
requester: None,
current_hop: 10,
subscribe: false,
blocking_subscribe: false,
next_hop: target,
tried_peers,
alternatives: vec![],
attempts_at_hop: 1,
visited,
})),
result: None,
stats: None,
upstream_addr: None,
local_fallback: None,
auto_fetch: true,
client_return_code: true,
};
assert!(
!op.is_client_initiated(),
"Auto-fetch GET should not be client-initiated"
);
}
#[test]
fn is_client_initiated_false_for_other_states() {
let op = make_get_op(Some(GetState::ReceivedRequest), None);
assert!(!op.is_client_initiated());
let key = make_contract_key(1);
let op = make_get_op(Some(GetState::Finished(FinishedData { key })), None);
assert!(!op.is_client_initiated());
let op = make_get_op(None, None);
assert!(!op.is_client_initiated());
}
#[test]
fn forwarding_ack_serde_roundtrip() {
let id = Transaction::new::<GetMsg>();
let instance_id = ContractInstanceId::new([42; 32]);
let msg = GetMsg::ForwardingAck { id, instance_id };
let serialized = bincode::serialize(&msg).expect("serialize");
let deserialized: GetMsg = bincode::deserialize(&serialized).expect("deserialize");
match deserialized {
GetMsg::ForwardingAck {
id: deser_id,
instance_id: deser_iid,
} => {
assert_eq!(deser_id, id);
assert_eq!(deser_iid, instance_id);
}
other @ GetMsg::Request { .. }
| other @ GetMsg::Response { .. }
| other @ GetMsg::ResponseStreaming { .. }
| other @ GetMsg::ResponseStreamingAck { .. } => {
panic!("Expected ForwardingAck, got {other}")
}
}
}
use crate::ring::Location;
fn make_get_op_with_stats(stats: Option<Box<GetStats>>) -> GetOp {
GetOp {
id: Transaction::new::<GetMsg>(),
state: Some(GetState::ReceivedRequest),
result: None,
stats,
upstream_addr: None,
local_fallback: None,
auto_fetch: false,
client_return_code: true,
}
}
fn make_get_stats(
target: PeerKeyLocation,
contract_location: Location,
) -> Option<Box<GetStats>> {
Some(Box::new(GetStats {
next_peer: Some(target),
contract_location,
first_response_time: None,
transfer_time: None,
}))
}
#[test]
fn test_get_failure_outcome_with_stats() {
let target = make_peer(9001);
let contract_location = Location::random();
let op = make_get_op_with_stats(make_get_stats(target.clone(), contract_location));
match op.outcome() {
OpOutcome::ContractOpFailure {
target_peer,
contract_location: loc,
} => {
assert_eq!(target_peer, &target);
assert_eq!(loc, contract_location);
}
other => panic!("Expected ContractOpFailure, got {other:?}"),
}
}
#[test]
fn test_get_failure_outcome_without_stats() {
let op = make_get_op_with_stats(None);
assert!(
matches!(op.outcome(), OpOutcome::Incomplete),
"GET without stats should return Incomplete"
);
}
#[test]
fn test_get_failure_routing_info() {
let target = make_peer(9002);
let contract_location = Location::random();
let op = make_get_op_with_stats(make_get_stats(target.clone(), contract_location));
let (peer, loc) = op.failure_routing_info().expect("should have routing info");
assert_eq!(peer, target);
assert_eq!(loc, contract_location);
}
#[test]
fn test_get_msg_subscribe_roundtrip() {
use freenet_stdlib::prelude::ContractInstanceId;
let msg = GetMsg::Request {
id: Transaction::new::<GetMsg>(),
instance_id: ContractInstanceId::new([1; 32]),
fetch_contract: true,
htl: 10,
visited: VisitedPeers::default(),
subscribe: true,
};
let bytes = bincode::serialize(&msg).unwrap();
let restored: GetMsg = bincode::deserialize(&bytes).unwrap();
match restored {
GetMsg::Request { subscribe, .. } => assert!(subscribe),
_ => panic!("expected Request"),
}
let msg_false = GetMsg::Request {
id: Transaction::new::<GetMsg>(),
instance_id: ContractInstanceId::new([2; 32]),
fetch_contract: true,
htl: 10,
visited: VisitedPeers::default(),
subscribe: false,
};
let bytes_false = bincode::serialize(&msg_false).unwrap();
let restored_false: GetMsg = bincode::deserialize(&bytes_false).unwrap();
match restored_false {
GetMsg::Request { subscribe, .. } => assert!(!subscribe),
_ => panic!("expected Request"),
}
}
#[test]
fn test_get_stats_timer_methods_produce_valid_timing() {
use crate::ring::{Location, PeerKeyLocation};
use std::time::Duration;
let target_peer = PeerKeyLocation::random();
let contract_location = Location::random();
let mut stats = GetStats {
next_peer: Some(target_peer.clone()),
contract_location,
first_response_time: None,
transfer_time: None,
};
stats.start_timers();
assert!(stats.first_response_time.is_some());
assert!(stats.transfer_time.is_some());
assert!(stats.first_response_time.unwrap().1.is_none());
assert!(stats.transfer_time.unwrap().1.is_none());
std::thread::sleep(Duration::from_millis(1));
stats.record_response_end();
assert!(stats.first_response_time.unwrap().1.is_some());
assert!(stats.transfer_time.unwrap().1.is_none());
std::thread::sleep(Duration::from_millis(1));
stats.record_transfer_end();
assert!(stats.transfer_time.unwrap().1.is_some());
let op = GetOp {
id: Transaction::new::<GetMsg>(),
state: None,
result: Some(GetResult {
key: make_contract_key(99),
state: WrappedState::new(vec![1, 2, 3]),
contract: None,
}),
stats: Some(Box::new(stats)),
upstream_addr: None,
local_fallback: None,
auto_fetch: false,
client_return_code: true,
};
match op.outcome() {
OpOutcome::ContractOpSuccess {
first_response_time,
payload_transfer_time,
..
} => {
assert!(
first_response_time > Duration::ZERO,
"first_response_time should be positive, got {first_response_time:?}"
);
assert!(
payload_transfer_time > Duration::ZERO,
"payload_transfer_time should be positive, got {payload_transfer_time:?}"
);
assert!(
payload_transfer_time >= first_response_time,
"transfer should take at least as long as response"
);
}
other => panic!("Expected ContractOpSuccess, got {other:?}"),
}
}
#[test]
fn test_get_stats_start_timers_resets_on_retry() {
let mut stats = GetStats {
next_peer: None,
contract_location: Location::random(),
first_response_time: None,
transfer_time: None,
};
stats.start_timers();
let first_start = stats.first_response_time.unwrap().0;
std::thread::sleep(std::time::Duration::from_millis(1));
stats.start_timers();
let second_start = stats.first_response_time.unwrap().0;
assert!(
second_start > first_start,
"Retry should reset timers to a later instant"
);
assert!(stats.first_response_time.unwrap().1.is_none());
assert!(stats.transfer_time.unwrap().1.is_none());
}
type LocalValue = Option<(ContractKey, WrappedState, Option<ContractContainer>)>;
fn apply_relay_cache_decision(
is_relay: bool,
has_local_interest: bool,
local_value: LocalValue,
) -> (LocalValue, LocalValue) {
let mut local_fallback = None;
let local_value = if is_relay {
match &local_value {
Some(_) if !has_local_interest => {
local_fallback = local_value;
None
}
_ => {
local_value
}
}
} else {
local_value
};
(local_value, local_fallback)
}
#[test]
fn relay_peer_with_local_interest_serves_immediately() {
let key = make_contract_key(1);
let state = WrappedState::new(vec![1, 2, 3]);
let local_value = Some((key, state.clone(), None));
let (value, fallback) = apply_relay_cache_decision(true, true, local_value.clone());
assert!(
value.is_some(),
"Relay peer with local interest must serve immediately"
);
assert!(
fallback.is_none(),
"Should not defer to fallback when actively hosting"
);
}
#[test]
fn relay_peer_with_stale_cache_defers_with_fallback() {
let key = make_contract_key(1);
let state = WrappedState::new(vec![1, 2, 3]);
let local_value = Some((key, state.clone(), None));
let (value, fallback) = apply_relay_cache_decision(true, false, local_value.clone());
assert!(
value.is_none(),
"Relay peer without local interest should defer to network"
);
assert!(fallback.is_some(), "Stale cache should be kept as fallback");
}
#[test]
fn original_requester_always_serves_local_cache() {
let key = make_contract_key(1);
let state = WrappedState::new(vec![1, 2, 3]);
let local_value = Some((key, state.clone(), None));
let (value, fallback) = apply_relay_cache_decision(false, false, local_value);
assert!(value.is_some(), "Original requester always serves locally");
assert!(fallback.is_none());
}
#[test]
fn relay_peer_without_cache_forwards_to_network() {
let (value, fallback) = apply_relay_cache_decision(true, true, None);
assert!(value.is_none(), "Nothing to serve without cache");
assert!(fallback.is_none(), "Nothing to fall back to");
}
}