freenet 0.2.46

Freenet core software
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
//! Task-per-transaction client-initiated GET (#1454 Phase 3b).
//!
//! Mirrors [`crate::operations::put::op_ctx_task`] — the Phase 3a
//! production consumer of [`OpCtx::send_and_await`] for PUT, which in
//! turn follows the Phase 2b SUBSCRIBE driver shape. This module
//! applies the same pattern to client-initiated GET.
//!
//! # Scope (Phase 3b)
//!
//! Only the **client-initiated originator** GET runs through this
//! module. Relay GETs (non-terminal hops), GC-spawned retries, and
//! `start_targeted_op()` (UPDATE-triggered auto-fetch) stay on the
//! legacy re-entry loop. Relay migration is tracked in #3883.
//!
//! # Architecture
//!
//! The task owns all routing state in its locals — there is no `GetOp`
//! in `OpManager.ops.get` for any attempt this task makes. The task:
//!
//! 1. Loops, calling [`OpCtx::send_and_await`] with a fresh
//!    `Transaction` per attempt (single-use-per-tx constraint).
//!    `process_message` handles routing/forwarding on remote hops and
//!    the originator's own loop-back for the local-completion (cache
//!    hit) case, which echoes the `GetMsg::Request` back as the
//!    terminal "reply" via `forward_pending_op_result_if_completed`
//!    (same mechanism PUT 3a relies on for `LocalCompletion`).
//! 2. On terminal `Response{Found}`: the driver stores the returned
//!    state into the local executor via `PutQuery`, runs hosting /
//!    access-tracking / announce side effects, and publishes
//!    `HostResponse::ContractResponse::GetResponse` to the client.
//!    These side effects mirror what the legacy `process_message`
//!    Response{Found} branch does at `get.rs:2329` — for task-per-tx
//!    ops the bypass at `node.rs::handle_pure_network_message_v1`
//!    intercepts the terminal reply before `process_message` runs on
//!    the originator (because `load_or_init` would fail with
//!    `OpNotPresent`), so the driver is the only place they can
//!    happen.
//! 3. On terminal `ResponseStreaming`: Phase 3b delivers the final
//!    payload via a store re-query using the contract key from the
//!    envelope. Stream assembly and local caching for streamed
//!    payloads remain on the legacy path for now — full migration is
//!    tracked in #3883.
//! 4. On terminal Request-echo: the pre-send local-cache shortcut in
//!    `client_events.rs` already returned the state directly, so the
//!    driver just resolves the ContractKey from the store for
//!    telemetry and client delivery.
//! 5. On `Response{NotFound}`: advance to the next peer.
//! 6. On timeout / wire-error: advance to next peer or exhaust.
//!
//! # Connection-drop latency (R6)
//!
//! Legacy `handle_abort` detects disconnects in <1s. Task-per-tx
//! relies on the `OPERATION_TTL` (60s) timeout. Accepted ceiling,
//! matching Phase 2b/3a.

use std::sync::Arc;

use freenet_stdlib::client_api::{ContractResponse, ErrorKind, HostResponse};
use freenet_stdlib::prelude::*;

use crate::client_events::HostResult;
use crate::config::GlobalExecutor;
use crate::contract::{ContractHandlerEvent, StoreResponse};
use crate::message::{NetMessage, NetMessageV1, Transaction};
use crate::node::OpManager;
use crate::operations::OpError;
use crate::operations::VisitedPeers;
use crate::operations::op_ctx::{
    AdvanceOutcome, AttemptOutcome, RetryDriver, RetryLoopOutcome, drive_retry_loop,
};
use crate::ring::{Location, PeerKeyLocation};
use crate::router::{RouteEvent, RouteOutcome};
use crate::transport::peer_connection::StreamId;

use super::{GetMsg, GetMsgResult, GetStreamingPayload};
use crate::operations::orphan_streams::{OrphanStreamError, STREAM_CLAIM_TIMEOUT};

/// Test-only counter that increments every time `start_client_get` is
/// called. Used by integration tests to verify that a GET actually
/// routed through the task-per-tx driver rather than being satisfied
/// by the `client_events.rs` local-cache shortcut.
#[cfg(any(test, feature = "testing"))]
pub static DRIVER_CALL_COUNT: std::sync::atomic::AtomicUsize =
    std::sync::atomic::AtomicUsize::new(0);

/// Start a client-initiated GET, returning as soon as the task has been
/// spawned (mirrors legacy `request_get` timing).
///
/// The caller must have already registered a result waiter for
/// `client_tx` via `op_manager.ch_outbound.waiting_for_transaction_result`.
/// This function does NOT touch the waiter; it only drives the
/// ring/network side and publishes the terminal result to
/// `result_router_tx` keyed by `client_tx`.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn start_client_get(
    op_manager: Arc<OpManager>,
    client_tx: Transaction,
    instance_id: ContractInstanceId,
    return_contract_code: bool,
    subscribe: bool,
    blocking_subscribe: bool,
) -> Result<Transaction, OpError> {
    // Test-only: count driver invocations so integration tests can
    // assert the driver was actually called (as opposed to
    // `client_events.rs`'s local-cache shortcut satisfying the GET).
    // Removed under #[cfg(not(any(test, feature = "testing")))].
    #[cfg(any(test, feature = "testing"))]
    DRIVER_CALL_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);

    tracing::debug!(
        tx = %client_tx,
        contract = %instance_id,
        "get (task-per-tx): spawning client-initiated task"
    );

    // Fire-and-forget spawn; same rationale as PUT 3a's `start_client_put`.
    // Failures are published to the client via `result_router_tx`, not
    // via this function's return value. Not registered with
    // `BackgroundTaskMonitor`: per-transaction task that terminates via
    // happy path, exhaustion, timeout, or infra error.
    GlobalExecutor::spawn(run_client_get(
        op_manager,
        client_tx,
        instance_id,
        return_contract_code,
        subscribe,
        blocking_subscribe,
    ));

    Ok(client_tx)
}

async fn run_client_get(
    op_manager: Arc<OpManager>,
    client_tx: Transaction,
    instance_id: ContractInstanceId,
    return_contract_code: bool,
    subscribe: bool,
    blocking_subscribe: bool,
) {
    let outcome = drive_client_get(
        op_manager.clone(),
        client_tx,
        instance_id,
        return_contract_code,
        subscribe,
        blocking_subscribe,
    )
    .await;
    deliver_outcome(&op_manager, client_tx, outcome);
}

/// GET driver has exactly two outcomes, matching PUT 3a.
#[derive(Debug)]
enum DriverOutcome {
    /// The driver produced a `HostResult` that must be published via
    /// `result_router_tx`.
    Publish(HostResult),
    /// A genuine infrastructure failure escaped the driver loop.
    InfrastructureError(OpError),
}

async fn drive_client_get(
    op_manager: Arc<OpManager>,
    client_tx: Transaction,
    instance_id: ContractInstanceId,
    return_contract_code: bool,
    subscribe: bool,
    blocking_subscribe: bool,
) -> DriverOutcome {
    match drive_client_get_inner(
        &op_manager,
        client_tx,
        instance_id,
        return_contract_code,
        subscribe,
        blocking_subscribe,
    )
    .await
    {
        Ok(outcome) => outcome,
        Err(err) => DriverOutcome::InfrastructureError(err),
    }
}

async fn drive_client_get_inner(
    op_manager: &Arc<OpManager>,
    client_tx: Transaction,
    instance_id: ContractInstanceId,
    return_contract_code: bool,
    subscribe: bool,
    blocking_subscribe: bool,
) -> Result<DriverOutcome, OpError> {
    let htl = op_manager.ring.max_hops_to_live;

    // Pre-select initial target for the driver's retry state. Actual
    // routing is done by `process_message` on the loop-back; this is
    // just so `advance_to_next_peer` has a starting "tried" set.
    //
    // At the client-API boundary we only have an instance_id — the
    // full ContractKey (which includes the code hash) isn't known
    // until a terminal reply with `GetMsgResult::Found` arrives.
    // `k_closest_potentially_hosting` accepts either.
    let mut tried: Vec<std::net::SocketAddr> = Vec::new();
    if let Some(own_addr) = op_manager.ring.connection_manager.get_own_addr() {
        tried.push(own_addr);
    }
    let initial_target = op_manager
        .ring
        .k_closest_potentially_hosting(&instance_id, tried.as_slice(), 1)
        .into_iter()
        .next();
    let current_target = match initial_target {
        Some(peer) => {
            if let Some(addr) = peer.socket_addr() {
                tried.push(addr);
            }
            peer
        }
        None => op_manager.ring.connection_manager.own_location(),
    };

    let mut driver = GetRetryDriver {
        op_manager,
        instance_id,
        htl,
        tried,
        retries: 0,
        current_target,
        attempt_visited: VisitedPeers::new(&client_tx),
    };

    let loop_result = drive_retry_loop(op_manager, client_tx, "get", &mut driver).await;

    match loop_result {
        RetryLoopOutcome::Done(terminal) => {
            // Clean up any DashMap entry left behind. `op_manager.completed`
            // is idempotent, so calling it even when the driver never
            // pushed is harmless.
            op_manager.completed(client_tx);

            // Mirror the originator-side side effects that the legacy
            // `process_message` Response{Found} branch does
            // (`get.rs:2218–2450`): PutQuery the fetched state into
            // the local executor so re-GETs / local-cache checks / the
            // hosting LRU see it, announce hosting, and record the
            // access. Without this, a client-initiated GET succeeds
            // on the wire but the requesting node never stores the
            // contract — which broke `test_get_routing_coverage_low_htl`
            // and `test_auto_fetch_from_update_sender` on CI when the
            // bypass was first introduced.
            //
            // The bypass at `node.rs::handle_pure_network_message_v1`
            // intercepts the terminal reply BEFORE `process_message`
            // runs on the originator (by design — process_message
            // would fail with OpNotPresent for a task-per-tx op), so
            // the driver is the only place these side effects can
            // happen for Phase 3b.
            let reply_key = match &terminal {
                Terminal::InlineFound {
                    key,
                    state,
                    contract,
                } => {
                    cache_contract_locally(op_manager, *key, state.clone(), contract.clone()).await;
                    *key
                }
                Terminal::Streaming {
                    key,
                    stream_id,
                    includes_contract,
                } => {
                    // Assemble the stream and cache locally. Mirrors
                    // the legacy `process_message` streaming branch
                    // at `get.rs:2721-3196`. Uses `current_target`
                    // as the sender address — accurate for the
                    // single-hop response case where the responder
                    // equals the selected target.
                    if let Some(peer_addr) = driver.current_target.socket_addr() {
                        if let Err(e) = assemble_and_cache_stream(
                            op_manager,
                            peer_addr,
                            *stream_id,
                            *key,
                            *includes_contract,
                        )
                        .await
                        {
                            tracing::warn!(
                                %key,
                                error = %e,
                                "get (task-per-tx): stream assembly failed — \
                                 state will not be cached locally"
                            );
                        }
                    } else {
                        tracing::warn!(
                            %key,
                            "get (task-per-tx): current_target has no socket_addr; \
                             cannot claim orphan stream"
                        );
                    }
                    *key
                }
                Terminal::LocalCompletion => {
                    // Request-echo: the pre-send local-cache shortcut
                    // in `client_events.rs` already returned a cached
                    // state directly, so reaching here means
                    // `request_get`'s fallback cached it. The store
                    // already has the bytes; just resolve the key.
                    match lookup_stored_key(op_manager, &instance_id).await {
                        Some(k) => k,
                        None => synthetic_key(&instance_id),
                    }
                }
            };

            let host_result =
                build_host_response(op_manager, &instance_id, return_contract_code).await;

            // Auto-subscribe on successful GET at the originator —
            // mirrors the legacy branches at get.rs:2313/2408/3136/3185.
            // AUTO_SUBSCRIBE_ON_GET (ring.rs:60) is a const; we still
            // guard on `is_subscribed` to avoid duplicate registration
            // if the request-router already wired up a subscribe.
            //
            // When the client explicitly set `subscribe=true`, the
            // dedicated `maybe_subscribe_child` path below runs — skip
            // auto-subscribe here so we never double-subscribe.
            if host_result.is_ok()
                && !subscribe
                && crate::ring::AUTO_SUBSCRIBE_ON_GET
                && !op_manager.ring.is_subscribed(&reply_key)
            {
                let path_label = match &terminal {
                    Terminal::Streaming { .. } => "streaming (task-per-tx)",
                    Terminal::InlineFound { .. } | Terminal::LocalCompletion => {
                        "non-streaming (task-per-tx)"
                    }
                };
                crate::operations::auto_subscribe_on_get_response(
                    op_manager,
                    &reply_key,
                    &client_tx,
                    &Some(driver.current_target.clone()),
                    /* subscribe_requested */ false,
                    /* blocking_sub */ blocking_subscribe,
                    path_label,
                )
                .await;
            }

            // Emit routing event + telemetry — `report_result` (which
            // normally does both) doesn't run because the bypass
            // intercepted the Response. Without this, the router's
            // prediction model never receives GET success feedback.
            //
            // The success flag tracks the actual client-visible
            // outcome (`host_result.is_ok()`), not the wire-level
            // reply — if the store re-query returned nothing, the
            // client sees OperationError and telemetry must agree.
            let contract_location = Location::from(&reply_key);
            let route_event = RouteEvent {
                peer: driver.current_target.clone(),
                contract_location,
                outcome: if host_result.is_ok() {
                    RouteOutcome::SuccessUntimed
                } else {
                    RouteOutcome::Failure
                },
                op_type: Some(crate::node::network_status::OpType::Get),
            };
            if let Some(log_event) =
                crate::tracing::NetEventLog::route_event(&client_tx, &op_manager.ring, &route_event)
            {
                op_manager
                    .ring
                    .register_events(either::Either::Left(log_event))
                    .await;
            }
            op_manager.ring.routing_finished(route_event);
            crate::node::network_status::record_op_result(
                crate::node::network_status::OpType::Get,
                host_result.is_ok(),
            );

            // Explicit-subscribe hand-off. Mirrors PUT 3a's
            // `maybe_subscribe_child` — subscribe is never handled in
            // the terminal-result construction to avoid double-subscribe
            // (commit 494a3c69). This only runs when the client set
            // `subscribe=true`; the auto-subscribe path above handles
            // the AUTO_SUBSCRIBE_ON_GET fallback.
            maybe_subscribe_child(
                op_manager,
                client_tx,
                reply_key,
                subscribe,
                blocking_subscribe,
            )
            .await;

            Ok(DriverOutcome::Publish(host_result))
        }
        RetryLoopOutcome::Exhausted(cause) => {
            Ok(DriverOutcome::Publish(Err(ErrorKind::OperationError {
                cause: cause.into(),
            }
            .into())))
        }
        RetryLoopOutcome::Unexpected => Err(OpError::UnexpectedOpState),
        RetryLoopOutcome::InfraError(err) => Err(err),
    }
}

// --- Retry-driver state and classification ---

struct GetRetryDriver<'a> {
    op_manager: &'a OpManager,
    instance_id: ContractInstanceId,
    htl: usize,
    tried: Vec<std::net::SocketAddr>,
    retries: usize,
    current_target: PeerKeyLocation,
    attempt_visited: VisitedPeers,
}

/// Terminal value for the GET driver.
///
/// Carries the bytes needed to (a) store the contract in the local
/// executor via `PutQuery` — matching the side effect that the legacy
/// `process_message` Response{Found} branch performs at
/// `get.rs:2329` — and (b) build the client-facing
/// `HostResponse::GetResponse`.
#[derive(Debug)]
enum Terminal {
    /// Inline Response{Found}: state and optional contract arrived in
    /// the reply envelope; driver stores them locally via PutQuery.
    InlineFound {
        key: ContractKey,
        state: WrappedState,
        contract: Option<ContractContainer>,
    },
    /// ResponseStreaming: the envelope references a stream_id whose
    /// bytes arrive separately via the orphan stream registry. The
    /// driver claims the stream, awaits assembly, and caches the
    /// assembled state + contract locally — mirroring what the
    /// legacy `process_message` streaming branch does at
    /// `get.rs:2721-3196`.
    Streaming {
        key: ContractKey,
        stream_id: StreamId,
        includes_contract: bool,
    },
    /// Request-echo from `forward_pending_op_result_if_completed` —
    /// state was already in the local store (via the pre-send
    /// local-cache shortcut), so no new PutQuery is needed. The
    /// driver just resolves the key from the store.
    LocalCompletion,
}

/// Classify a reply into a driver outcome. Extracted from the
/// `RetryDriver::classify` impl so it's reachable from unit tests.
fn classify(reply: NetMessage) -> AttemptOutcome<Terminal> {
    match reply {
        NetMessage::V1(NetMessageV1::Get(GetMsg::Response {
            result:
                GetMsgResult::Found {
                    key,
                    value:
                        StoreResponse {
                            state: Some(state),
                            contract,
                        },
                },
            ..
        })) => AttemptOutcome::Terminal(Terminal::InlineFound {
            key,
            state,
            contract,
        }),
        NetMessage::V1(NetMessageV1::Get(GetMsg::Response {
            result: GetMsgResult::Found { value, .. },
            ..
        })) => {
            tracing::warn!(
                ?value,
                "get (task-per-tx): Response{{Found}} arrived without state"
            );
            AttemptOutcome::Unexpected
        }
        NetMessage::V1(NetMessageV1::Get(GetMsg::Response {
            result: GetMsgResult::NotFound,
            ..
        })) => AttemptOutcome::Retry,
        NetMessage::V1(NetMessageV1::Get(GetMsg::ResponseStreaming {
            key,
            stream_id,
            includes_contract,
            ..
        })) => AttemptOutcome::Terminal(Terminal::Streaming {
            key,
            stream_id,
            includes_contract,
        }),
        NetMessage::V1(NetMessageV1::Get(GetMsg::Request { .. })) => {
            AttemptOutcome::Terminal(Terminal::LocalCompletion)
        }
        // Explicit non-terminal `GetMsg` variants. These should never
        // reach the driver — the bypass at
        // `node.rs::handle_pure_network_message_v1` gates forwarding
        // on terminal variants only — so their arrival here indicates
        // a bug in the bypass gate (Phase 2b Bug 2 class).
        NetMessage::V1(NetMessageV1::Get(
            GetMsg::ForwardingAck { .. } | GetMsg::ResponseStreamingAck { .. },
        )) => AttemptOutcome::Unexpected,
        // Non-GET NetMessage variants (or any future `GetMsg` variant
        // added without updating this match) fall through to
        // Unexpected. If a new GetMsg variant is added, this arm must
        // be audited — the Phase 3b GET driver has explicit handling
        // for every variant above, and the bypass filter in node.rs
        // must be extended in lockstep.
        _ => AttemptOutcome::Unexpected,
    }
}

impl RetryDriver for GetRetryDriver<'_> {
    type Terminal = Terminal;

    fn new_attempt_tx(&mut self) -> Transaction {
        let tx = Transaction::new::<GetMsg>();
        self.attempt_visited = VisitedPeers::new(&tx);
        tx
    }

    fn build_request(&mut self, attempt_tx: Transaction) -> NetMessage {
        NetMessage::from(GetMsg::Request {
            id: attempt_tx,
            instance_id: self.instance_id,
            fetch_contract: true,
            htl: self.htl,
            visited: self.attempt_visited.clone(),
            subscribe: false,
        })
    }

    fn classify(&mut self, reply: NetMessage) -> AttemptOutcome<Terminal> {
        classify(reply)
    }

    fn advance(&mut self) -> AdvanceOutcome {
        match advance_to_next_peer(
            self.op_manager,
            &self.instance_id,
            &mut self.tried,
            &mut self.retries,
        ) {
            Some((next_target, _next_addr)) => {
                self.current_target = next_target;
                AdvanceOutcome::Next
            }
            None => AdvanceOutcome::Exhausted,
        }
    }
}

// --- Host-response construction ---

/// Query the local contract store for `(state, contract)` and package
/// a client-facing `HostResponse::ContractResponse::GetResponse`.
///
/// If the local store doesn't have the state (which should not happen
/// on the happy path — `process_message` stores before the terminal
/// reply fires), synthesize an operation error matching the shape
/// `to_host_result` produces on NotFound.
async fn build_host_response(
    op_manager: &OpManager,
    instance_id: &ContractInstanceId,
    return_contract_code: bool,
) -> HostResult {
    let lookup = op_manager
        .notify_contract_handler(ContractHandlerEvent::GetQuery {
            instance_id: *instance_id,
            return_contract_code,
        })
        .await;

    match lookup {
        Ok(ContractHandlerEvent::GetResponse {
            key: Some(resolved_key),
            response:
                Ok(StoreResponse {
                    state: Some(state),
                    contract,
                }),
        }) => {
            // Strip contract code if client didn't ask for it. The
            // node always pulls WASM for local caching/validation,
            // but the client-facing payload obeys their flag.
            let client_contract = if return_contract_code { contract } else { None };
            Ok(HostResponse::ContractResponse(
                ContractResponse::GetResponse {
                    key: resolved_key,
                    contract: client_contract,
                    state,
                },
            ))
        }
        _ => {
            tracing::warn!(
                contract = %instance_id,
                "get (task-per-tx): terminal reply classified success but local \
                 store lookup returned no state; synthesizing client error"
            );
            Err(ErrorKind::OperationError {
                cause: format!(
                    "GET succeeded on wire but local store lookup failed for {instance_id}"
                )
                .into(),
            }
            .into())
        }
    }
}

/// Fallback `ContractKey` for telemetry when we have neither a
/// remote-reply key nor a store-resolved key. Zero code-hash is the
/// documented sentinel — routing telemetry only needs a `Location`
/// derived from the instance_id, which is preserved here.
fn synthetic_key(instance_id: &ContractInstanceId) -> ContractKey {
    ContractKey::from_id_and_code(*instance_id, CodeHash::new([0u8; 32]))
}

/// Store the fetched contract state in the local executor and run
/// the originator-side hosting side effects. Mirrors the legacy
/// `process_message` Response{Found} branch at `get.rs:2218-2450`:
///
/// 1. **Idempotency short-circuit (issue #2018)** — re-query the
///    local store first. If the existing bytes match the incoming
///    state, skip `PutQuery` entirely so contracts that enforce
///    identical-state rejection in `update_state()` aren't invoked
///    redundantly.
/// 2. **Unconditional hosting refresh** — `record_get_access`,
///    `mark_local_client_access`, and interest-manager eviction
///    cleanup run for BOTH the state-matches short-circuit AND the
///    PutQuery-failed error paths. Legacy behaviour at
///    `get.rs:2420-2435` continues these side effects on error so
///    re-GETs keep refreshing the hosting LRU/TTL even when the
///    executor rejected the write.
/// 3. **Newly-hosted announcement** — only runs when the local
///    store actually transitioned from no-state to has-state
///    (i.e., `access_result.is_new && put_persisted`).
async fn cache_contract_locally(
    op_manager: &OpManager,
    key: ContractKey,
    state: WrappedState,
    contract: Option<ContractContainer>,
) {
    let state_size = state.size() as u64;

    // (1) Idempotency short-circuit: re-query the local store FIRST.
    // Comparing bytes against the incoming state avoids re-invoking
    // `update_state()` in contracts that reject identical updates
    // (regression guard for issue #2018 / PR #2018).
    let local_state = op_manager
        .notify_contract_handler(ContractHandlerEvent::GetQuery {
            instance_id: *key.id(),
            return_contract_code: false,
        })
        .await;
    let state_matches = matches!(
        &local_state,
        Ok(ContractHandlerEvent::GetResponse {
            response: Ok(StoreResponse {
                state: Some(local),
                ..
            }),
            ..
        }) if local.as_ref() == state.as_ref(),
    );

    // (2) Decide whether PutQuery must run. If local state already
    // matches or we lack the contract code (issue #2306), skip the
    // PutQuery but STILL run hosting side effects below so LRU/TTL
    // refresh does not depend on the write path.
    let put_persisted = if state_matches {
        tracing::debug!(
            %key,
            "get (task-per-tx): local state matches, skipping redundant PutQuery"
        );
        false
    } else if let Some(contract_code) = contract {
        match op_manager
            .notify_contract_handler(ContractHandlerEvent::PutQuery {
                key,
                state,
                related_contracts: RelatedContracts::default(),
                contract: Some(contract_code),
            })
            .await
        {
            Ok(ContractHandlerEvent::PutResponse {
                new_value: Ok(_), ..
            }) => true,
            Ok(ContractHandlerEvent::PutResponse {
                new_value: Err(err),
                ..
            }) => {
                tracing::warn!(
                    %key,
                    %err,
                    "get (task-per-tx): PutQuery rejected by executor"
                );
                false
            }
            Ok(other) => {
                tracing::warn!(
                    %key,
                    ?other,
                    "get (task-per-tx): PutQuery returned unexpected event"
                );
                false
            }
            Err(err) => {
                tracing::warn!(
                    %key,
                    %err,
                    "get (task-per-tx): PutQuery failed"
                );
                false
            }
        }
    } else {
        // No contract code + state differs — we can't cache (issue #2306).
        // Still refresh hosting side effects below so re-GET TTL bookkeeping
        // isn't gated on the write path.
        tracing::debug!(
            %key,
            "get (task-per-tx): skipping local cache — contract code missing"
        );
        false
    };

    // (3) Hosting side effects ALWAYS run (state_matches, put_persisted,
    // or put failed). This mirrors the legacy invariant that a
    // successful wire-level GET must refresh the hosting LRU/TTL
    // regardless of what the local executor did with the state.
    let access_result = op_manager.ring.record_get_access(key, state_size);
    op_manager.ring.mark_local_client_access(&key);

    let mut removed_contracts = Vec::new();
    for evicted_key in &access_result.evicted {
        if op_manager
            .interest_manager
            .unregister_local_hosting(evicted_key)
        {
            removed_contracts.push(*evicted_key);
        }
    }

    // (4) Newly-hosted announcement gates on BOTH first-time access
    // AND the fact that we actually persisted new state. Without
    // persistence there's nothing to announce hosting for.
    if access_result.is_new && put_persisted {
        crate::operations::announce_contract_hosted(op_manager, &key).await;
        let became_interested = op_manager.interest_manager.register_local_hosting(&key);
        let added = if became_interested { vec![key] } else { vec![] };
        if !added.is_empty() || !removed_contracts.is_empty() {
            crate::operations::broadcast_change_interests(op_manager, added, removed_contracts)
                .await;
        }
    } else if !removed_contracts.is_empty() {
        crate::operations::broadcast_change_interests(op_manager, vec![], removed_contracts).await;
    }
}

/// Claim an orphan stream, await assembly, deserialize the payload,
/// and cache the contract state locally.
///
/// Mirrors the originator-side streaming branch of the legacy
/// `process_message` at `get.rs:2721-3196`. The driver is the only
/// place this can run for task-per-tx GETs because the bypass at
/// `node.rs::handle_pure_network_message_v1` forwards the
/// `ResponseStreaming` envelope to the driver before
/// `handle_op_request` — `process_message` never executes on the
/// originator for task-per-tx ops (`load_or_init` would return
/// `OpNotPresent`).
///
/// `peer_addr` is the sender's transport address — currently we
/// use `driver.current_target.socket_addr()`, which is accurate for
/// single-hop responses. Multi-hop (where a relay answers on behalf
/// of a further peer) is not yet supported by the task-per-tx driver;
/// see #3883.
async fn assemble_and_cache_stream(
    op_manager: &OpManager,
    peer_addr: std::net::SocketAddr,
    stream_id: StreamId,
    expected_key: ContractKey,
    includes_contract: bool,
) -> Result<(), String> {
    let handle = match op_manager
        .orphan_stream_registry()
        .claim_or_wait(peer_addr, stream_id, STREAM_CLAIM_TIMEOUT)
        .await
    {
        Ok(h) => h,
        Err(OrphanStreamError::AlreadyClaimed) => {
            tracing::debug!(
                %peer_addr,
                %stream_id,
                "stream already claimed (dedup)"
            );
            return Ok(());
        }
        Err(e) => return Err(format!("claim_or_wait: {e}")),
    };

    let bytes = handle
        .assemble()
        .await
        .map_err(|e| format!("stream assembly: {e}"))?;

    let payload: GetStreamingPayload =
        bincode::deserialize(&bytes).map_err(|e| format!("deserialize: {e}"))?;

    if payload.key != expected_key {
        return Err(format!(
            "stream key mismatch: expected {expected_key}, got {}",
            payload.key
        ));
    }

    let Some(state) = payload.value.state else {
        return Err("stream payload has no state".into());
    };

    let contract = if includes_contract {
        payload.value.contract
    } else {
        None
    };

    cache_contract_locally(op_manager, payload.key, state, contract).await;
    Ok(())
}

/// Re-query the local store for the contract key, used on the
/// LocalCompletion path where the Request echo carries only an
/// instance_id.
async fn lookup_stored_key(
    op_manager: &OpManager,
    instance_id: &ContractInstanceId,
) -> Option<ContractKey> {
    let lookup = op_manager
        .notify_contract_handler(ContractHandlerEvent::GetQuery {
            instance_id: *instance_id,
            return_contract_code: false,
        })
        .await;

    match lookup {
        Ok(ContractHandlerEvent::GetResponse {
            key: Some(key),
            response: Ok(_),
        }) => Some(key),
        _ => None,
    }
}

// --- Peer advance ---

/// Maximum routing rounds before giving up. Matches PUT 3a's
/// `MAX_RETRIES = 3` and SUBSCRIBE's driver. With typical ring
/// fan-out of 3–5 peers per k_closest call, 3 rounds covers
/// 9–15 distinct peers.
const MAX_RETRIES: usize = 3;

fn advance_to_next_peer(
    op_manager: &OpManager,
    instance_id: &ContractInstanceId,
    tried: &mut Vec<std::net::SocketAddr>,
    retries: &mut usize,
) -> Option<(PeerKeyLocation, std::net::SocketAddr)> {
    if *retries >= MAX_RETRIES {
        return None;
    }
    *retries += 1;

    let peer = op_manager
        .ring
        .k_closest_potentially_hosting(instance_id, tried.as_slice(), 1)
        .into_iter()
        .next()?;
    let addr = peer.socket_addr()?;
    tried.push(addr);
    Some((peer, addr))
}

// --- Subscribe child ---

/// Start a post-GET subscription if requested. Mirrors
/// `put::op_ctx_task::maybe_subscribe_child` verbatim.
///
/// For `blocking_subscribe = true`, awaits the subscribe driver inline.
/// For `blocking_subscribe = false`, spawns a fire-and-forget task.
async fn maybe_subscribe_child(
    op_manager: &Arc<OpManager>,
    client_tx: Transaction,
    key: ContractKey,
    subscribe: bool,
    blocking_subscribe: bool,
) {
    if !subscribe {
        return;
    }

    use crate::operations::subscribe;

    let child_tx = Transaction::new_child_of::<subscribe::SubscribeMsg>(&client_tx);

    // Register the child so `LocalSubscribeComplete` hits the
    // silent-absorb branch instead of trying to publish to a
    // nonexistent waiter.
    op_manager.expect_and_register_sub_operation(client_tx, child_tx);

    if blocking_subscribe {
        subscribe::run_client_subscribe(op_manager.clone(), *key.id(), child_tx).await;
    } else {
        GlobalExecutor::spawn(subscribe::run_client_subscribe(
            op_manager.clone(),
            *key.id(),
            child_tx,
        ));
    }
}

// --- Outcome delivery ---

fn deliver_outcome(op_manager: &OpManager, client_tx: Transaction, outcome: DriverOutcome) {
    match outcome {
        DriverOutcome::Publish(result) => {
            op_manager.send_client_result(client_tx, result);
        }
        DriverOutcome::InfrastructureError(err) => {
            tracing::warn!(
                tx = %client_tx,
                error = %err,
                "get (task-per-tx): infrastructure error; publishing synthesized client error"
            );
            let synthesized: HostResult = Err(ErrorKind::OperationError {
                cause: format!("GET failed: {err}").into(),
            }
            .into());
            op_manager.send_client_result(client_tx, synthesized);
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn dummy_key() -> ContractKey {
        ContractKey::from_id_and_code(ContractInstanceId::new([1u8; 32]), CodeHash::new([2u8; 32]))
    }

    fn dummy_tx() -> Transaction {
        Transaction::new::<GetMsg>()
    }

    #[test]
    fn classify_response_found_is_inline_terminal() {
        let tx = dummy_tx();
        let key = dummy_key();
        let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::Response {
            id: tx,
            instance_id: *key.id(),
            result: GetMsgResult::Found {
                key,
                value: StoreResponse {
                    state: Some(WrappedState::new(vec![1u8])),
                    contract: None,
                },
            },
        }));
        assert!(matches!(
            classify(msg),
            AttemptOutcome::Terminal(Terminal::InlineFound { .. })
        ));
    }

    #[test]
    fn classify_response_notfound_is_retry() {
        let tx = dummy_tx();
        let key = dummy_key();
        let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::Response {
            id: tx,
            instance_id: *key.id(),
            result: GetMsgResult::NotFound,
        }));
        assert!(matches!(classify(msg), AttemptOutcome::Retry));
    }

    #[test]
    fn classify_response_streaming_is_streaming_terminal() {
        let tx = dummy_tx();
        let key = dummy_key();
        let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::ResponseStreaming {
            id: tx,
            instance_id: *key.id(),
            stream_id: crate::transport::peer_connection::StreamId::next(),
            key,
            total_size: 1024,
            includes_contract: true,
        }));
        assert!(matches!(
            classify(msg),
            AttemptOutcome::Terminal(Terminal::Streaming { .. })
        ));
    }

    #[test]
    fn classify_forwarding_ack_is_unexpected() {
        let tx = dummy_tx();
        let key = dummy_key();
        let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::ForwardingAck {
            id: tx,
            instance_id: *key.id(),
        }));
        assert!(
            matches!(classify(msg), AttemptOutcome::Unexpected),
            "ForwardingAck must NOT be classified as terminal (Phase 2b bug 2)"
        );
    }

    #[test]
    fn classify_response_streaming_ack_is_unexpected() {
        let tx = dummy_tx();
        let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::ResponseStreamingAck {
            id: tx,
            stream_id: crate::transport::peer_connection::StreamId::next(),
        }));
        assert!(matches!(classify(msg), AttemptOutcome::Unexpected));
    }

    #[test]
    fn classify_request_echo_is_local_completion() {
        // When process_message completes locally (no next hop, contract
        // already cached), the Request is echoed back via
        // forward_pending_op_result_if_completed.
        let tx = dummy_tx();
        let key = dummy_key();
        let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::Request {
            id: tx,
            instance_id: *key.id(),
            fetch_contract: true,
            htl: 5,
            visited: VisitedPeers::new(&tx),
            subscribe: false,
        }));
        assert!(matches!(
            classify(msg),
            AttemptOutcome::Terminal(Terminal::LocalCompletion)
        ));
    }

    #[test]
    fn classify_response_found_without_state_is_unexpected() {
        // Defensive: if a peer somehow returns Found but the inner
        // StoreResponse has no state, the driver must NOT build an
        // InlineFound Terminal with a missing state.
        let tx = dummy_tx();
        let key = dummy_key();
        let msg = NetMessage::V1(NetMessageV1::Get(GetMsg::Response {
            id: tx,
            instance_id: *key.id(),
            result: GetMsgResult::Found {
                key,
                value: StoreResponse {
                    state: None,
                    contract: None,
                },
            },
        }));
        assert!(matches!(classify(msg), AttemptOutcome::Unexpected));
    }

    #[test]
    fn classify_unexpected_for_non_get_message() {
        let tx = dummy_tx();
        let msg = NetMessage::V1(NetMessageV1::Aborted(tx));
        assert!(matches!(classify(msg), AttemptOutcome::Unexpected));
    }

    #[test]
    fn max_retries_boundary_exhausts_at_limit() {
        let mut retries: usize = 0;
        for _ in 0..MAX_RETRIES {
            assert!(retries < MAX_RETRIES, "should not exhaust before limit");
            retries += 1;
        }
        assert!(
            retries >= MAX_RETRIES,
            "should exhaust at MAX_RETRIES={MAX_RETRIES}"
        );
    }

    #[test]
    fn driver_outcome_exhausted_produces_client_error() {
        let cause = "GET to contract failed after 3 attempts".to_string();
        let outcome: DriverOutcome = match RetryLoopOutcome::<()>::Exhausted(cause) {
            RetryLoopOutcome::Exhausted(cause) => {
                DriverOutcome::Publish(Err(ErrorKind::OperationError {
                    cause: cause.into(),
                }
                .into()))
            }
            RetryLoopOutcome::Done(_)
            | RetryLoopOutcome::Unexpected
            | RetryLoopOutcome::InfraError(_) => unreachable!(),
        };
        assert!(
            matches!(outcome, DriverOutcome::Publish(Err(_))),
            "Exhaustion must produce a client error, not be swallowed"
        );
    }

    /// Extract the non-test source of `op_ctx_task.rs` by truncating
    /// at the `#[cfg(test)]` marker. Used by the bug-reproduction
    /// source-scrape tests below so that comments inside this test
    /// module don't create false positives.
    fn production_source() -> &'static str {
        const FULL: &str = include_str!("op_ctx_task.rs");
        let cutoff = FULL
            .find("#[cfg(test)]")
            .expect("file must have a #[cfg(test)] section");
        // The str literal outlives `cutoff`; slicing gives a &'static str.
        #[allow(clippy::manual_unwrap_or_default)]
        {
            &FULL[..cutoff]
        }
    }

    /// Isolate the body of a named function inside production source.
    fn extract_fn_body<'a>(source: &'a str, signature_prefix: &str) -> &'a str {
        let start = source
            .find(signature_prefix)
            .unwrap_or_else(|| panic!("could not find {signature_prefix}"));
        // Find the opening `{` of the body.
        let brace = source[start..].find('{').expect("fn sig must have body");
        let body_start = start + brace + 1;
        // Walk to the matching closing brace, tracking nesting.
        let bytes = source.as_bytes();
        let mut depth: i32 = 1;
        let mut i = body_start;
        while i < bytes.len() {
            match bytes[i] {
                b'{' => depth += 1,
                b'}' => {
                    depth -= 1;
                    if depth == 0 {
                        return &source[body_start..i];
                    }
                }
                _ => {}
            }
            i += 1;
        }
        panic!("unterminated fn body for {signature_prefix}");
    }

    /// Bug #3 reproduction: the legacy Response{Found} branch at
    /// `get.rs:2218-2241` reads the local store via
    /// `ContractHandlerEvent::GetQuery` FIRST, compares the stored bytes
    /// against the incoming `value`, and skips the `PutQuery` entirely
    /// when they match. This prevents re-invoking `update_state()` on
    /// contracts that implement idempotency checks (see #2018). The
    /// task-per-tx driver's `cache_contract_locally` must replicate
    /// this idempotency short-circuit.
    #[test]
    fn cache_contract_locally_has_state_matches_short_circuit() {
        let src = production_source();
        let body = extract_fn_body(src, "async fn cache_contract_locally(");
        // A proper idempotency short-circuit reads the local state
        // first (GetQuery BEFORE PutQuery) and compares bytes.
        let get_pos = body
            .find("ContractHandlerEvent::GetQuery")
            .unwrap_or(usize::MAX);
        let put_pos = body
            .find("ContractHandlerEvent::PutQuery")
            .unwrap_or(usize::MAX);
        let has_byte_compare = body.contains("as_ref() ==") || body.contains("state_matches");
        let has_short_circuit = get_pos < put_pos && has_byte_compare;
        assert!(
            has_short_circuit,
            "cache_contract_locally is missing the state_matches idempotency \
             short-circuit from the legacy Response{{Found}} branch \
             (get.rs:2218-2241). Without it the driver re-invokes PutQuery \
             on identical state — regressing issue #2018 for contracts \
             that enforce idempotency in update_state()."
        );
    }

    /// Bug #4 reproduction: the legacy branch at `get.rs:2420-2435`
    /// logs on PutQuery error but **continues** to run the hosting /
    /// interest / access-tracking side effects. Hosting LRU / TTL
    /// must refresh for ANY successful wire-level GET, not only when
    /// the local store write succeeded.
    ///
    /// After the fix the side effects live OUTSIDE the PutQuery match:
    /// the match result feeds a `put_persisted: bool` and
    /// `record_get_access` / `announce_contract_hosted` run after the
    /// match closes — reachable from state-matches, PutQuery-Ok, and
    /// PutQuery-Err paths alike. We pin that structure here by
    /// requiring the `record_get_access` call to appear AFTER the
    /// PutResponse match arms (identified by the `Err(err)` arm) in
    /// the source order.
    #[test]
    fn cache_contract_locally_runs_side_effects_on_put_error() {
        let src = production_source();
        let body = extract_fn_body(src, "async fn cache_contract_locally(");
        let err_arm = body
            .find("new_value: Err(")
            .expect("PutResponse Err arm must exist");
        let side_effect = body
            .find("record_get_access")
            .expect("record_get_access must be called");
        assert!(
            side_effect > err_arm,
            "record_get_access must run AFTER the PutResponse match \
             (outside both Ok and Err arms) so hosting LRU/TTL refresh on \
             any successful wire-level GET — including when the local \
             executor rejects the PutQuery. The legacy branch at \
             get.rs:2420-2435 continues these side effects on error; \
             the driver must match."
        );
    }

    /// Bug #2 reproduction (source-level): the legacy branch calls
    /// `auto_subscribe_on_get_response` for any client-initiated GET
    /// when `AUTO_SUBSCRIBE_ON_GET` is true (see `get.rs:2313, 2408`).
    /// The driver currently never calls it; `maybe_subscribe_child`
    /// only handles the explicit `subscribe=true` flag. This test
    /// pairs with `test_driver_inline_get_triggers_auto_subscribe`
    /// (integration) to lock down both the absence and the symptom.
    #[test]
    fn driver_calls_auto_subscribe_on_get_response() {
        let src = production_source();
        assert!(
            src.contains("auto_subscribe_on_get_response"),
            "The driver must invoke `auto_subscribe_on_get_response` on \
             successful GET terminal paths (AUTO_SUBSCRIBE_ON_GET = true in \
             ring.rs:60). The legacy branch does this at get.rs:2313/2408/3136/3185; \
             the driver must mirror it so client GETs with subscribe=false \
             still register the fallback subscription."
        );
    }

    /// Bug #5 reproduction (source-level): `record_op_result` in the
    /// Done arm must NOT be emitted unconditionally as success — if
    /// `build_host_response` returned `Err`, the telemetry should
    /// reflect failure. Otherwise the router's prediction model and
    /// the network_status dashboard say "GET succeeded" while the
    /// client sees an OperationError.
    #[test]
    fn record_op_result_reflects_host_result_outcome() {
        const SOURCE: &str = include_str!("op_ctx_task.rs");
        // Find the Done arm (the `RetryLoopOutcome::Done` branch).
        let done_arm_start = SOURCE
            .find("RetryLoopOutcome::Done(")
            .expect("Done arm must exist");
        let next_arm = SOURCE[done_arm_start..]
            .find("RetryLoopOutcome::Exhausted")
            .expect("Exhausted arm must follow");
        let arm = &SOURCE[done_arm_start..done_arm_start + next_arm];
        // Locate the record_op_result call inside the arm.
        let call_pos = arm
            .find("record_op_result")
            .expect("record_op_result must be called in Done arm");
        // Get the surrounding ~200 chars to inspect the success flag.
        let tail = &arm[call_pos..];
        let call_window = &tail[..tail.len().min(200)];
        // Unconditional `true` is the bug. A proper implementation
        // derives the success flag from `host_result.is_ok()` or a
        // similarly named value.
        let looks_unconditional = call_window.contains("true,") && !call_window.contains("is_ok()");
        assert!(
            !looks_unconditional,
            "record_op_result in the Done arm is passed an unconditional \
             `true`. The success flag must track `host_result.is_ok()` so \
             telemetry does not diverge from the client-visible outcome. \
             Call window: {call_window}"
        );
    }

    /// Non-bug: per #3757, the node ALWAYS requests contract code on
    /// the wire regardless of what the client asked for, so it can
    /// cache WASM for validation/hosting. The driver matches legacy
    /// `start_op` behaviour (`get.rs:59` hard-codes the same value).
    /// This test pins the intentional choice so a future refactor
    /// doesn't silently reintroduce client-flag pass-through.
    #[test]
    fn driver_hardcodes_fetch_contract_true_per_issue_3757() {
        let src = production_source();
        let build_body = extract_fn_body(
            src,
            "fn build_request(&mut self, attempt_tx: Transaction) -> NetMessage {",
        );
        assert!(
            build_body.contains("fetch_contract: true,"),
            "GetMsg::Request.fetch_contract must stay hard-coded `true` — \
             the node needs WASM for local validation/hosting regardless of \
             the client's return_contract_code preference (issue #3757 / \
             get.rs:52-55)."
        );
    }

    /// Bug #1 regression: `Terminal::Streaming` in the Done arm must
    /// invoke `assemble_and_cache_stream` so that streamed GET
    /// responses actually write the contract state into the local
    /// executor. Without this call, a cold-cache client GET of a
    /// >threshold contract succeeds on the wire but leaves the
    /// originator's local store empty — the client gets
    /// `OperationError` via `build_host_response`'s re-query miss.
    ///
    /// The simulation-level driver-isolation tests for this path are
    /// `#[ignore]`'d pending infrastructure work (#3883); this
    /// source-scrape pins the wiring so the call can't be silently
    /// removed.
    #[test]
    fn streaming_terminal_calls_assemble_and_cache_stream() {
        let src = production_source();
        let body = extract_fn_body(src, "async fn drive_client_get_inner(");
        // Find the `Terminal::Streaming` arm of the Done match.
        let arm = body
            .find("Terminal::Streaming {")
            .expect("Done arm must handle Terminal::Streaming");
        // The matching arm must call `assemble_and_cache_stream`.
        let tail = &body[arm..];
        // Bound the search to this arm by clipping at the next
        // `Terminal::` match arm.
        let arm_end = tail[1..]
            .find("Terminal::")
            .map(|p| p + 1)
            .unwrap_or(tail.len());
        let arm_body = &tail[..arm_end];
        assert!(
            arm_body.contains("assemble_and_cache_stream"),
            "Terminal::Streaming arm of drive_client_get_inner must call \
             `assemble_and_cache_stream`. Without this, cold-cache streaming \
             GETs return OperationError because nothing writes the local \
             store. See bug #1 in PR #3884 review."
        );
    }

    /// Pure-data regression test for the streaming payload shape the
    /// driver deserializes. Locks down the invariant that
    /// `GetStreamingPayload` round-trips via bincode, so a regression
    /// that changes the wire format would break `assemble_and_cache_stream`
    /// loudly at this level instead of silently producing an empty
    /// store (bug #1 class).
    #[test]
    fn streaming_payload_round_trips_via_bincode() {
        let key = dummy_key();
        let state_bytes = vec![0x42u8; 512];
        let payload = GetStreamingPayload {
            key,
            value: StoreResponse {
                state: Some(WrappedState::new(state_bytes.clone())),
                contract: None,
            },
        };
        let encoded = bincode::serialize(&payload).expect("bincode encode");
        let decoded: GetStreamingPayload = bincode::deserialize(&encoded).expect("bincode decode");
        assert_eq!(decoded.key, key);
        assert_eq!(
            decoded.value.state.as_ref().map(|s| s.as_ref().to_vec()),
            Some(state_bytes),
            "state bytes must round-trip through the streaming payload"
        );
    }

    /// Bug #1 follow-through: `assemble_and_cache_stream` must claim
    /// the stream by `(peer_addr, stream_id)`, await assembly, and
    /// check the key matches before caching. The source-scrape
    /// verifies the function's structure hasn't been simplified in a
    /// way that would skip any of those steps.
    #[test]
    fn assemble_and_cache_stream_performs_claim_assemble_key_check() {
        let src = production_source();
        let body = extract_fn_body(src, "async fn assemble_and_cache_stream(");

        assert!(
            body.contains("orphan_stream_registry") && body.contains("claim_or_wait"),
            "assemble_and_cache_stream must claim the stream via \
             orphan_stream_registry().claim_or_wait()"
        );
        assert!(
            body.contains(".assemble()") && body.contains(".await"),
            "assemble_and_cache_stream must await stream assembly"
        );
        assert!(
            body.contains("GetStreamingPayload") && body.contains("bincode::deserialize"),
            "assemble_and_cache_stream must deserialize the payload \
             as GetStreamingPayload"
        );
        assert!(
            body.contains("payload.key != expected_key"),
            "assemble_and_cache_stream must verify the stream payload's \
             key matches the expected ContractKey — a mismatch would \
             silently cache the wrong contract under the expected key"
        );
        assert!(
            body.contains("cache_contract_locally"),
            "assemble_and_cache_stream must delegate the actual write \
             and hosting side effects to cache_contract_locally"
        );
    }

    /// Guard against subscribe firing when the client did not request it.
    /// Source-scrape to verify `maybe_subscribe_child` short-circuits on
    /// `!subscribe`. Mirrors the spirit of PUT 3a's
    /// `finalize_put_at_originator_never_subscribes_from_driver`.
    #[test]
    fn maybe_subscribe_child_short_circuits_on_false() {
        const SOURCE: &str = include_str!("op_ctx_task.rs");
        let fn_start = SOURCE
            .find("async fn maybe_subscribe_child(")
            .expect("maybe_subscribe_child must exist");
        let body = &SOURCE[fn_start..];
        let early_return = body
            .find("if !subscribe {")
            .expect("maybe_subscribe_child must short-circuit on !subscribe");
        let register_call = body
            .find("expect_and_register_sub_operation")
            .expect("maybe_subscribe_child must register sub-operation");
        assert!(
            early_return < register_call,
            "The !subscribe short-circuit must come BEFORE the \
             expect_and_register_sub_operation call — otherwise we'd \
             register a spurious sub-op for a client who didn't ask for \
             one. See PUT 3a commit 494a3c69 for the analogous bug class."
        );
    }
}