wrest 0.5.6

Async HTTP client for Windows backed by WinHTTP, with a reqwest-compatible API
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
//! Reusable callback -> Future bridge for Win32 async APIs.
//!
//! This module provides three primitives that turn any Win32 callback-based API
//! into idiomatic Rust `Future`s:
//!
//! - `CallbackContext` -- safe `Arc` <-> `usize` lifecycle for `DWORD_PTR` context values
//! - [`CompletionSignal<T>`] -- reusable one-shot channel bridging callback to future
//! - [`await_win32()`] -- async combinator: install listener -> call Win32 -> await callback
//!
//! **No Win32 knowledge** -- the bridge is generic. It solves the problem:
//! "an OS callback fires on an arbitrary thread, I want a Rust `Future` to resolve."
//!
//! **Executor-agnostic** -- uses `futures_channel::oneshot`, works on tokio, async-std,
//! or bare `block_on`.

use crate::util::lock_or_clear;
use futures_channel::oneshot;
use std::{
    marker::PhantomData,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll},
};

// ---------------------------------------------------------------------------
// CallbackContext -- RAII Arc <-> usize lifecycle
// ---------------------------------------------------------------------------

/// RAII wrapper that parks an `Arc<T>` as a raw `usize` for use as a
/// Win32 callback context (`DWORD_PTR` / `PVOID`).
///
/// `new` clones the input `Arc` and converts it to a raw pointer, so the
/// strong count is incremented by exactly one.  The matching decrement
/// happens automatically when this value is dropped, **unless** ownership
/// has been transferred to the OS via [`Self::into_raw`] (in which case
/// the OS callback that owns the retained reference must call
/// [`Self::drop_raw`] exactly once when it has finished).
///
/// Inside a callback, recover the value with [`Self::borrow_raw`] (no
/// refcount change) or [`Self::clone_arc_from_raw`] (extra strong ref
/// for the duration of the callback).
///
/// Auto-traits are inherited from `Arc<T>` via `PhantomData`, so
/// `CallbackContext<T>: Send + Sync` iff `T: Send + Sync` -- matching
/// the semantics of the strong reference it owns.
pub(crate) struct CallbackContext<T> {
    raw: usize,
    _marker: PhantomData<Arc<T>>,
}

impl<T> CallbackContext<T> {
    /// Clone `arc` and park the clone as a raw `usize`.
    ///
    /// Increments the inner strong count by one.
    pub fn new(arc: &Arc<T>) -> Self {
        Self {
            raw: Arc::into_raw(Arc::clone(arc)) as usize,
            _marker: PhantomData,
        }
    }

    /// The raw pointer bits, suitable for passing as a Win32 `DWORD_PTR`
    /// / `PVOID` callback context.
    pub fn as_raw(&self) -> usize {
        self.raw
    }

    /// Consume `self` **without** decrementing the strong count.
    ///
    /// Use this when ownership of the retained reference is transferred
    /// to an OS callback whose final invocation will call
    /// [`Self::drop_raw`].  Until that final callback runs, the
    /// `Arc<T>` stays alive.
    pub fn into_raw(self) -> usize {
        let raw = self.raw;
        std::mem::forget(self);
        raw
    }

    /// Borrow the inner value behind a raw context pointer.
    ///
    /// Returns a reference valid for the duration of the callback
    /// invocation.  Does **not** change the reference count.
    ///
    /// # Safety
    /// - `raw` must have been produced by [`Self::new`] (or
    ///   [`Self::into_raw`]) and not yet dropped via [`Self::drop_raw`].
    pub unsafe fn borrow_raw<'a>(raw: usize) -> &'a T {
        unsafe { &*(raw as *const T) }
    }

    /// Clone the underlying `Arc<T>` from a raw context pointer.
    ///
    /// Increments the strong count by one; the original retained
    /// reference is untouched.  Useful when a callback needs to keep
    /// the value alive across an action that may invalidate the raw
    /// pointer (e.g. the final callback calling [`Self::drop_raw`]).
    ///
    /// # Safety
    /// - `raw` must have been produced by [`Self::new`] (or
    ///   [`Self::into_raw`]) and not yet dropped via [`Self::drop_raw`].
    pub unsafe fn clone_arc_from_raw(raw: usize) -> Arc<T> {
        unsafe {
            let ptr = raw as *const T;
            Arc::increment_strong_count(ptr);
            Arc::from_raw(ptr)
        }
    }

    /// Drop a raw context pointer, releasing the retained strong ref.
    ///
    /// Use this from the **final** OS callback (e.g.,
    /// `WINHTTP_CALLBACK_STATUS_HANDLE_CLOSING`) that owns a context
    /// previously released via [`Self::into_raw`].
    ///
    /// # Safety
    /// - `raw` must have been produced by [`Self::new`] (or
    ///   [`Self::into_raw`]) and not yet dropped via this function.
    /// - Must be called exactly once per retained reference.
    pub unsafe fn drop_raw(raw: usize) {
        unsafe {
            drop(Arc::from_raw(raw as *const T));
        }
    }
}

impl<T> Drop for CallbackContext<T> {
    fn drop(&mut self) {
        // SAFETY: `self.raw` was produced in `new` and not yet released.
        // The only consuming exit (`into_raw`) `mem::forget`s `self`
        // before this `Drop` can run.
        unsafe {
            drop(Arc::from_raw(self.raw as *const T));
        }
    }
}

// ---------------------------------------------------------------------------
// CompletionSignal -- reusable one-shot channel
// ---------------------------------------------------------------------------

/// A reusable one-shot channel that bridges Win32 callbacks to Rust Futures.
///
/// Before each async operation, call [`listen()`](Self::listen) to get a `Future`.
/// From the callback, call [`signal()`](Self::signal) to resolve that `Future`.
///
/// Only one listener is active at a time -- this matches Win32's sequential
/// async model where you must wait for one callback before starting the
/// next operation.
pub struct CompletionSignal<T: Send> {
    sender: Mutex<Option<oneshot::Sender<T>>>,
}

/// Future returned by [`CompletionSignal::listen()`].
pub struct CompletionFuture<T>(oneshot::Receiver<T>);

/// Error returned when the sender is dropped without signaling
/// (e.g., the Win32 handle was closed, cancelling in-flight operations).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SignalCancelled;

impl std::fmt::Display for SignalCancelled {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_str("completion signal cancelled (sender dropped)")
    }
}

impl std::error::Error for SignalCancelled {}

impl<T: Send> CompletionSignal<T> {
    /// Create a new `CompletionSignal` with no active listener.
    pub fn new() -> Self {
        Self {
            sender: Mutex::new(None),
        }
    }

    /// Install a fresh listener. Returns a future that resolves when
    /// [`signal()`](Self::signal) is called from a callback.
    ///
    /// If a previous listener was never consumed (e.g., the Win32 function
    /// failed synchronously, or the future was dropped), it is silently
    /// replaced. This makes the signal robust against all cancellation
    /// and error paths.
    pub fn listen(&self) -> CompletionFuture<T> {
        let (tx, rx) = oneshot::channel();
        // Safe to recover from poison: `sender` is an `Option<Sender>` slot
        // with only `.replace()` / `.take()` -- no multi-field invariant.
        let old = lock_or_clear(&self.sender).replace(tx);
        if old.is_some() {
            debug!("CompletionSignal::listen: replacing unconsumed sender");
        }
        CompletionFuture(rx)
    }

    /// Signal the current listener from a callback.
    ///
    /// Safe to call from any thread -- the `Mutex` is held for nanoseconds
    /// (just an `Option::take`). No-op if no listener is installed.
    pub fn signal(&self, value: T) {
        // Safe to recover from poison: `sender` is an `Option<Sender>` slot
        // with only `.take()` / `.replace()` -- no multi-field invariant.
        // Release the lock before `tx.send`: send wakes the receiver, and
        // a custom inline-polling waker could re-enter `signal` and deadlock.
        let taken = lock_or_clear(&self.sender).take();
        if let Some(tx) = taken {
            let _ = tx.send(value); // Receiver may be dropped (cancelled) -- that's fine
        }
    }
}

impl<T: Send> Default for CompletionSignal<T> {
    fn default() -> Self {
        Self::new()
    }
}

impl<T> Future for CompletionFuture<T> {
    type Output = Result<T, SignalCancelled>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // SAFETY: We're projecting the pin to the inner Receiver, which is Unpin.
        let receiver = &mut self.get_mut().0;
        // oneshot::Receiver is Unpin, so we can pin it trivially.
        Pin::new(receiver)
            .poll(cx)
            .map(|r| r.map_err(|_| SignalCancelled))
    }
}

// ---------------------------------------------------------------------------
// await_win32 -- the async combinator
// ---------------------------------------------------------------------------

/// Start a Win32 async operation and await its completion callback.
///
/// 1. Calls `signal.listen()` to prepare for the callback.
/// 2. Calls `start_op()` to begin the Win32 operation.
/// 3. If `start_op` succeeds -> awaits the callback via the listener.
///    If `start_op` fails  -> returns the error immediately (the listener
///    is abandoned and will be replaced on the next `listen()` call).
///
/// This handles the critical WinHTTP edge case: when a function like
/// `WinHttpSendRequest` returns `FALSE` with `GetLastError() !=
/// `ERROR_IO_PENDING`, the failure is synchronous and no callback fires.
pub async fn await_win32<T, E, F>(signal: &CompletionSignal<T>, start_op: F) -> Result<T, E>
where
    T: Send,
    E: From<SignalCancelled>,
    F: FnOnce() -> Result<(), E>,
{
    let future = signal.listen();
    start_op()?; // Synchronous failure -> return Err, listener abandoned
    future.await.map_err(E::from)
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::*;
    use std::time::Duration;

    #[test]
    fn signal_from_another_thread() {
        let signal = Arc::new(CompletionSignal::<u32>::new());
        let signal2 = Arc::clone(&signal);
        let future = signal.listen();

        std::thread::spawn(move || {
            std::thread::sleep(Duration::from_millis(10));
            signal2.signal(42);
        });

        let result = futures_executor::block_on(future);
        assert_eq!(result, Ok(42));
    }

    #[test]
    fn signal_before_poll_still_works() {
        let signal: CompletionSignal<u32> = Default::default();
        let future = signal.listen();
        signal.signal(7); // Signal before the future is polled
        assert_eq!(futures_executor::block_on(future), Ok(7));
    }

    #[test]
    fn dropped_sender_returns_cancelled() {
        let signal = CompletionSignal::<u32>::new();
        let future = signal.listen();
        drop(signal); // Sender dropped without signaling
        assert!(futures_executor::block_on(future).is_err());
    }

    #[test]
    fn replaced_listener_does_not_panic() {
        let signal = CompletionSignal::<u32>::new();
        let _fut1 = signal.listen(); // Never consumed
        let fut2 = signal.listen(); // Replaces fut1's sender silently
        signal.signal(99);
        assert_eq!(futures_executor::block_on(fut2), Ok(99));
    }

    #[test]
    fn sequential_reuse() {
        // Simulates the Win32 pattern: listen -> signal -> listen -> signal -> ...
        let signal = CompletionSignal::<&str>::new();

        let f1 = signal.listen();
        signal.signal("step1");
        assert_eq!(futures_executor::block_on(f1), Ok("step1"));

        let f2 = signal.listen();
        signal.signal("step2");
        assert_eq!(futures_executor::block_on(f2), Ok("step2"));
    }

    #[test]
    fn callback_context_drop_releases() {
        let state = Arc::new(String::from("hello"));
        let ctx = CallbackContext::<String>::new(&state);
        assert_eq!(Arc::strong_count(&state), 2);

        // SAFETY: `ctx` is live and `raw` came from it.
        unsafe {
            let s: &String = CallbackContext::<String>::borrow_raw(ctx.as_raw());
            assert_eq!(s, "hello");
        }

        // Drop releases the retained strong ref automatically.
        drop(ctx);
        assert_eq!(Arc::strong_count(&state), 1);
        assert_eq!(*state, "hello"); // Still valid
    }

    #[test]
    fn callback_context_into_raw_skips_drop() {
        let state = Arc::new(String::from("hello"));
        let ctx = CallbackContext::<String>::new(&state);
        assert_eq!(Arc::strong_count(&state), 2);

        // Caller asserts an OS callback now owns the retained reference.
        let raw = ctx.into_raw();
        assert_eq!(Arc::strong_count(&state), 2);

        // SAFETY: the retained reference is still alive (we just forgot
        // the wrapper), so cloning the inner `Arc` is sound.
        let extra = unsafe { CallbackContext::<String>::clone_arc_from_raw(raw) };
        assert_eq!(Arc::strong_count(&state), 3);
        drop(extra);
        assert_eq!(Arc::strong_count(&state), 2);

        // SAFETY: `raw` is still the live retained reference; drop it
        // exactly once.
        unsafe { CallbackContext::<String>::drop_raw(raw) };
        assert_eq!(Arc::strong_count(&state), 1);
        assert_eq!(*state, "hello");
    }

    /// Custom error type for testing `await_win32`.
    #[derive(Debug, PartialEq)]
    enum TestError {
        Sync(&'static str),
        Cancelled,
    }

    impl From<SignalCancelled> for TestError {
        fn from(_: SignalCancelled) -> Self {
            TestError::Cancelled
        }
    }

    #[test]
    fn await_win32_sync_failure_skips_callback() {
        let signal = CompletionSignal::<u32>::new();
        let result = futures_executor::block_on(await_win32(&signal, || {
            Err::<(), _>(TestError::Sync("sync fail"))
        }));
        assert_eq!(result, Err(TestError::Sync("sync fail")));
        // The listener was abandoned -- next listen() should work fine
        let _fut = signal.listen();
    }

    #[test]
    fn await_win32_success() {
        let signal = CompletionSignal::<u32>::new();
        // Signal right after start_op succeeds (simulates immediate callback)
        let result = futures_executor::block_on(async {
            // We need to signal from another thread since await_win32 will block
            // waiting for the signal after start_op succeeds.
            let signal_ref = &signal;

            // For this test, signal before awaiting (oneshot channels buffer one value)
            let future = signal_ref.listen();
            signal_ref.signal(42);
            // Re-create the pattern manually since we can't signal during await_win32
            // in a single-threaded executor
            future.await.map_err(TestError::from)
        });
        assert_eq!(result, Ok(42));
    }

    #[test]
    fn await_win32_threaded_success() {
        let signal = Arc::new(CompletionSignal::<u32>::new());
        let signal2 = Arc::clone(&signal);

        let result = futures_executor::block_on(await_win32(&signal, || {
            // Start a thread that will signal after a brief delay
            std::thread::spawn(move || {
                std::thread::sleep(Duration::from_millis(10));
                signal2.signal(99);
            });
            Ok::<(), TestError>(())
        }));
        assert_eq!(result, Ok(99));
    }

    /// Dropping the `CompletionSignal` after `start_op` succeeds but
    /// before the future resolves triggers the `From<SignalCancelled>`
    /// conversion.
    #[test]
    fn await_win32_cancelled_signal() {
        let signal = CompletionSignal::<u32>::new();
        let future = signal.listen();
        // Drop the signal without calling .signal() -- the sender is gone.
        drop(signal);
        // The future resolves to Err(SignalCancelled), which converts
        // via From<SignalCancelled> into TestError::Cancelled.
        let result: Result<u32, TestError> =
            futures_executor::block_on(async { future.await.map_err(TestError::from) });
        assert_eq!(result, Err(TestError::Cancelled));
    }
}