1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
//! Two-phase effect patterns for network operations.
//!
//! This module provides adapters and patterns for making network operations
//! follow the two-phase reserve/commit pattern required by the asupersync runtime.
//!
//! # Pattern Implementation
//!
//! Network streams should implement the `TwoPhaseNetworkSend` trait to ensure
//! cancel-safe operation:
//!
//! ```ignore
//! use asupersync::runtime::effects::SendPermit;
//!
//! trait TwoPhaseNetworkSend {
//! type Error;
//!
//! async fn reserve_send(&mut self) -> Result<SendPermit<Self::Error>, Self::Error>;
//! }
//! ```
//!
//! # Migration Guide
//!
//! Network operations that currently use direct send patterns should be migrated:
//!
//! ```ignore
//! // BEFORE: Direct send (violates runtime invariant)
//! impl AtpH3Stream {
//! pub fn send(&mut self, data: &[u8]) -> Result<(), AtpH3Error> {
//! // Direct queue operation - NOT cancel-safe
//! self.send_queue.push_back(data.to_vec());
//! Ok(())
//! }
//! }
//!
//! // AFTER: Two-phase send (follows runtime invariant)
//! impl AtpH3Stream {
//! pub async fn reserve_send(&mut self) -> Result<SendPermit<AtpH3Error>, AtpH3Error> {
//! // Check if we can send and reserve space
//! if !self.can_send() {
//! return Err(AtpH3Error::Stream(format!(
//! "Cannot send on stream {} in state {:?}",
//! self.stream_id, self.state
//! )));
//! }
//!
//! if self.send_queue.len() >= self.send_queue_high_water {
//! return Err(AtpH3Error::Stream(
//! "Send queue full - apply backpressure".to_string(),
//! ));
//! }
//!
//! // Reserve space by incrementing a reserved count
//! self.reserved_sends += 1;
//!
//! let stream_id = self.stream_id;
//! let send_queue = &mut self.send_queue;
//! let reserved_sends = &mut self.reserved_sends;
//! let max_buffer_size = self.max_buffer_size;
//!
//! Ok(SendPermit::new(
//! move |data: &[u8]| {
//! // Commit: add to send queue
//! if data.len() > max_buffer_size {
//! return Err(AtpH3Error::Stream(format!(
//! "Data size {} exceeds maximum buffer size {}",
//! data.len(),
//! max_buffer_size
//! )));
//! }
//! send_queue.push_back(data.to_vec());
//! *reserved_sends -= 1;
//! Ok(())
//! },
//! move || {
//! // Abort: release reservation
//! *reserved_sends -= 1;
//! }
//! ))
//! }
//! }
//! ```
use SendPermit;
/// Trait for network streams that support two-phase send operations.
/// Trait for streams that use the legacy direct-send pattern.
///
/// This trait identifies streams that need to be migrated to the two-phase pattern.
/// Implementing this trait is a temporary step during migration.