use super::*;
#[test]
fn handle_unsubscribe_inbound_preserves_legacy_branches() {
const SOURCE: &str = include_str!("../subscribe.rs");
let fn_start = SOURCE
.find("pub(crate) async fn handle_unsubscribe_inbound(")
.expect(
"handle_unsubscribe_inbound not found in subscribe.rs — \
rename or removal must trip this pin",
);
let next_item = SOURCE[fn_start..]
.find("\n#[cfg(test)]")
.expect("subsequent item anchor not found in subscribe.rs");
let body = &SOURCE[fn_start..fn_start + next_item];
assert!(
body.contains("Contract not found locally, ignoring unsubscribe"),
"handle_unsubscribe_inbound must keep the contract-not-found \
early-return branch — silent drop is the expected legacy \
behavior when the relay doesn't host the contract"
);
assert!(
body.contains("Contract lookup failed while handling unsubscribe"),
"handle_unsubscribe_inbound must keep the contract-lookup-error \
branch — `has_contract` errors must not corrupt downstream state"
);
assert!(
body.contains("could not resolve sender peer, downstream entry not removed"),
"handle_unsubscribe_inbound must keep the unresolvable-sender warn \
branch — silent removal of an arbitrary peer would corrupt \
downstream-subscriber accounting"
);
assert!(
body.contains("remove_downstream_subscriber(&key, peer)"),
"handle_unsubscribe_inbound must call `ring.remove_downstream_subscriber` \
to remove the sender from the per-contract downstream list"
);
assert!(
body.contains("remove_peer_interest(&key, peer)"),
"handle_unsubscribe_inbound must call `interest_manager.remove_peer_interest` \
to drop the sender from the per-peer interest registry"
);
assert!(
body.contains("should_unsubscribe_upstream(&key)"),
"handle_unsubscribe_inbound must gate the upstream propagation on \
`ring.should_unsubscribe_upstream` — chain-propagating without \
the gate would over-unsubscribe contracts that still have \
downstream subscribers"
);
assert!(
body.contains("send_unsubscribe_upstream(&key)"),
"handle_unsubscribe_inbound must call `op_manager.send_unsubscribe_upstream` \
when interest hits zero — without this the chain breaks and \
intermediate peers stay subscribed forever"
);
assert!(
body.contains("was_downstream || was_interested"),
"handle_unsubscribe_inbound must gate the global downstream-counter \
decrement on `was_downstream || was_interested` to stay in sync \
with `register_downstream_subscriber`'s increment"
);
}
#[test]
fn subscribe_dispatch_routes_unsubscribe_to_inbound_handler() {
const SOURCE: &str = include_str!("../../node.rs");
let anchor = "NetMessageV1::Subscribe(ref op) => {";
let branch_start = SOURCE
.find(anchor)
.expect("SUBSCRIBE branch not found in node.rs");
let next_variant = "// Non-transactional message types:";
let window_end = SOURCE[branch_start..]
.find(next_variant)
.expect("could not find end of SUBSCRIBE arm")
+ branch_start;
let window = &SOURCE[branch_start..window_end];
assert!(
window.contains("handle_unsubscribe_inbound("),
"SUBSCRIBE dispatch must call `handle_unsubscribe_inbound` for \
`SubscribeMsg::Unsubscribe` — without this the wire variant \
becomes a silent no-op"
);
assert!(
window.contains("SubscribeMsg::Unsubscribe {"),
"SUBSCRIBE dispatch must destructure the Unsubscribe variant to \
extract `id` and `instance_id` for the inbound handler"
);
}
#[test]
fn finalize_originator_subscribe_contains_all_required_side_effects() {
const SOURCE: &str = include_str!("../subscribe.rs");
let fn_start = SOURCE
.find("pub(super) async fn finalize_originator_subscribe(")
.expect(
"finalize_originator_subscribe not found in subscribe.rs — \
rename or removal must trip this pin (issue #4223)",
);
let body_open = SOURCE[fn_start..]
.find('{')
.expect("function body open brace not found")
+ fn_start;
let mut depth = 0i32;
let mut body_end = body_open;
for (i, c) in SOURCE[body_open..].char_indices() {
match c {
'{' => depth += 1,
'}' => {
depth -= 1;
if depth == 0 {
body_end = body_open + i + 1;
break;
}
}
_ => {}
}
}
assert!(
body_end > body_open,
"could not find balanced closing brace for \
finalize_originator_subscribe — body anchor is broken"
);
let raw_body = &SOURCE[fn_start..body_end];
let body: String = raw_body
.lines()
.map(|line| match line.find("//") {
Some(idx) => &line[..idx],
None => line,
})
.collect::<Vec<_>>()
.join("\n");
let body = body.as_str();
assert!(
body.contains("register_peer_interest"),
"finalize_originator_subscribe must register the responding peer as \
upstream interest — without it `send_unsubscribe_upstream` cannot \
find the peer to notify on client disconnect (#3874)"
);
assert!(
body.contains("ring.subscribe("),
"finalize_originator_subscribe must call `ring.subscribe(...)` to \
install the lease in `active_subscriptions` — without it the \
contract is not picked up by `contracts_needing_renewal` and the \
subscription silently dies at TTL expiry (#3851)"
);
assert!(
body.contains("complete_subscription_request(") && body.contains(", true)"),
"finalize_originator_subscribe must call \
`complete_subscription_request(..., true)` to clear the pending \
mark and reset backoff"
);
assert!(
body.contains("fetch_contract_if_missing"),
"finalize_originator_subscribe MUST call `fetch_contract_if_missing` \
so the originator has the contract body locally and can answer \
subsequent GETs from local state instead of returning NotFound \
(#4223 — 37% of GETs through subscriber peers were failing)"
);
assert!(
body.contains("announce_contract_hosted"),
"finalize_originator_subscribe MUST call `announce_contract_hosted` \
(gated on fetch success) so neighbors include us as an UPDATE \
broadcast target — without this, UPDATEs may not reach the \
subscriber even after the contract body is local (#3851)"
);
let fetch_pos = body
.find("fetch_contract_if_missing")
.expect("fetch call site already asserted above");
let announce_pos = body
.find("announce_contract_hosted")
.expect("announce call site already asserted above");
assert!(
fetch_pos < announce_pos,
"fetch_contract_if_missing must appear BEFORE \
announce_contract_hosted in finalize_originator_subscribe — \
announcing before the body is local would tell neighbors to \
forward UPDATEs to a peer that cannot validate them (Codex \
HIGH finding on PR #4224)"
);
let have_body_pos = body
.find("have_body")
.expect("finalize_originator_subscribe must bind `have_body` from the fetch match");
assert!(
body.contains("if have_body"),
"finalize_originator_subscribe MUST gate `announce_contract_hosted` \
on an `if have_body {{ ... }}` conditional — the gate is the \
Codex HIGH fix invariant, not just the ordering. Without it a \
future refactor could re-introduce the announce-without-body \
bug while keeping fetch < announce order."
);
assert!(
have_body_pos < announce_pos,
"the `have_body` binding MUST appear before the \
`announce_contract_hosted` call site so the announce is \
actually gated by the fetch result, not by some unrelated \
later binding of the same name"
);
assert!(
body.contains("add_local_client("),
"finalize_originator_subscribe must call `add_local_client(...)` \
so inbound ChangeInterests for this contract get processed"
);
assert!(
body.contains("!is_renewal"),
"finalize_originator_subscribe must gate `add_local_client` on \
`!is_renewal` — `add_client` is NOT idempotent \
(`ring::interest::Contract::add_client` increments \
`local_client_count` on every call), so an unconditional call \
on every ~2-minute renewal cycle would leak the gauge \
unboundedly"
);
}
#[test]
fn drive_client_subscribe_inner_calls_finalize_helper_on_subscribed() {
const SOURCE: &str = include_str!("op_ctx_task.rs");
let fn_start = SOURCE
.find("async fn drive_client_subscribe_inner(")
.expect("drive_client_subscribe_inner not found");
let branch_anchor = "ReplyClass::Subscribed { key } =>";
let branch_start = SOURCE[fn_start..]
.find(branch_anchor)
.expect("ReplyClass::Subscribed branch not found in drive_client_subscribe_inner")
+ fn_start;
let branch_end = SOURCE[branch_start..]
.find("ReplyClass::NotFound =>")
.expect("end of Subscribed branch not found")
+ branch_start;
let raw_branch = &SOURCE[branch_start..branch_end];
let branch: String = raw_branch
.lines()
.map(|line| match line.find("//") {
Some(idx) => &line[..idx],
None => line,
})
.collect::<Vec<_>>()
.join("\n");
let branch = branch.as_str();
assert!(
branch.contains("finalize_originator_subscribe"),
"Subscribed branch must delegate to `finalize_originator_subscribe` \
— inlining the side effects is what caused #4223 (missing \
fetch_contract_if_missing + announce_contract_hosted). Keep the \
helper as the single source of truth."
);
assert!(
!branch.contains("ring.subscribe("),
"Subscribed branch must not call `ring.subscribe(...)` directly — go \
through `finalize_originator_subscribe` so the fetch + announce \
steps stay grouped with the lease install"
);
assert!(
!branch.contains("complete_subscription_request("),
"Subscribed branch must not call `complete_subscription_request(...)` \
directly — go through `finalize_originator_subscribe`"
);
assert!(
!branch.contains("announce_contract_hosted"),
"Subscribed branch must not call `announce_contract_hosted` directly \
— the fetch-success gate lives inside `finalize_originator_subscribe`; \
inlining it would re-introduce the Codex HIGH finding on PR #4224 \
(announcing without the contract body)"
);
}
#[test]
fn subscribe_forwarding_ack_serde_roundtrip() {
let id = Transaction::new::<SubscribeMsg>();
let instance_id = ContractInstanceId::new([42; 32]);
let msg = SubscribeMsg::ForwardingAck { id, instance_id };
let serialized = bincode::serialize(&msg).expect("serialize");
let deserialized: SubscribeMsg = bincode::deserialize(&serialized).expect("deserialize");
match deserialized {
SubscribeMsg::ForwardingAck {
id: deser_id,
instance_id: deser_iid,
} => {
assert_eq!(deser_id, id);
assert_eq!(deser_iid, instance_id);
}
other @ SubscribeMsg::Request { .. }
| other @ SubscribeMsg::Response { .. }
| other @ SubscribeMsg::Unsubscribe { .. } => {
panic!("Expected ForwardingAck, got {other}")
}
}
}
#[test]
fn test_subscribe_msg_response_hop_count_roundtrip() {
use freenet_stdlib::prelude::{CodeHash, ContractKey};
let key =
ContractKey::from_id_and_code(ContractInstanceId::new([7u8; 32]), CodeHash::new([8u8; 32]));
let instance_id = *key.id();
let cases: &[(&str, usize)] = &[
("zero", 0),
("one", 1),
("mid", 4),
("htl", 10),
("large", 64),
];
for (label, hop_count) in cases.iter().copied() {
let subscribed = SubscribeMsg::Response {
id: Transaction::new::<SubscribeMsg>(),
instance_id,
result: SubscribeMsgResult::Subscribed { key },
hop_count,
};
let bytes = bincode::serialize(&subscribed).expect(label);
let restored: SubscribeMsg = bincode::deserialize(&bytes).expect(label);
match restored {
SubscribeMsg::Response { hop_count: hc, .. } => assert_eq!(
hc, hop_count,
"Subscribed Response.hop_count must roundtrip ({label})"
),
SubscribeMsg::Request { .. }
| SubscribeMsg::Unsubscribe { .. }
| SubscribeMsg::ForwardingAck { .. } => {
panic!("expected Response for {label}")
}
}
let notfound = SubscribeMsg::Response {
id: Transaction::new::<SubscribeMsg>(),
instance_id,
result: SubscribeMsgResult::NotFound,
hop_count,
};
let bytes = bincode::serialize(¬found).expect(label);
let restored: SubscribeMsg = bincode::deserialize(&bytes).expect(label);
match restored {
SubscribeMsg::Response { hop_count: hc, .. } => assert_eq!(
hc, hop_count,
"NotFound Response.hop_count must roundtrip ({label})"
),
SubscribeMsg::Request { .. }
| SubscribeMsg::Unsubscribe { .. }
| SubscribeMsg::ForwardingAck { .. } => {
panic!("expected Response for {label}")
}
}
}
}