1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
use std::time::Duration;

use blocking_permit::Semaphore;
use body_image::Tunables;

/// An additional set of tuning constants for asynchronous I/O, extending the
/// [`body_image::Tunables`] set.
///
/// Setters are available via [`FutioTuner`] (a builder type).
#[derive(Debug, Clone)]
pub struct FutioTunables {
    image: Tunables,
    res_timeout:     Option<Duration>,
    body_timeout:    Option<Duration>,
    blocking_policy: BlockingPolicy,
    stream_item_size: usize,
}

/// The policy for blocking operations.
#[derive(Debug, Copy, Clone)]
pub enum BlockingPolicy {
    /// Always run blocking operations directly and without further
    /// coordination.
    Direct,

    /// Acquire a `BlockingPermit` from the referenced `Semaphore` and use this
    /// to run the blocking operation on the _current_ thread.
    Permit(&'static Semaphore),

    /// Dispatch blocking operations to the [`blocking_permit::DispatchPool`]
    /// registered on the current thread.
    Dispatch,
}

impl FutioTunables {
    /// Construct with default values.
    pub fn new() -> FutioTunables {
        FutioTunables {
            image: Tunables::new(),
            res_timeout:  None,
            body_timeout: Some(Duration::from_secs(60)),
            blocking_policy: BlockingPolicy::Direct,
            stream_item_size: 512 * 1024,
        }
    }

    /// Return the base (body-image) `Tunables`.
    ///
    /// Default: As per body-image crate defaults.
    pub fn image(&self) -> &Tunables {
        &self.image
    }

    /// Return the maximum stream item buffer size in bytes, when using
    /// `SplitBodyImage`. Default: 512 KiB.
    pub fn stream_item_size(&self) -> usize {
        self.stream_item_size
    }

    /// Return the maximum initial response timeout interval.
    /// Default: None (e.g. unset)
    pub fn res_timeout(&self) -> Option<Duration> {
        self.res_timeout
    }

    /// Return the maximum streaming body timeout interval.
    /// Default: 60 seconds
    pub fn body_timeout(&self) -> Option<Duration> {
        self.body_timeout
    }

    /// Return a `Semaphore` reference for use in constraining the number of
    /// concurrent blocking operations.
    ///
    /// Default: None
    pub fn blocking_semaphore(&self) -> Option<&'static Semaphore> {
        if let BlockingPolicy::Permit(sema) = self.blocking_policy {
            Some(sema)
        } else {
            None
        }
    }

    /// Return the policy for any required blocking operations.
    ///
    /// Default: `BlockingPolicy::Direct`
    pub fn blocking_policy(&self) -> BlockingPolicy {
        self.blocking_policy
    }
}

impl Default for FutioTunables {
    fn default() -> Self { FutioTunables::new() }
}

impl AsRef<Tunables> for FutioTunables {
    fn as_ref(&self) -> &Tunables {
        self.image()
    }
}

/// A builder for [`FutioTunables`].
///
/// Invariants are asserted in the various setters and `finish`.
pub struct FutioTuner {
    template: FutioTunables
}

impl FutioTuner {
    /// Construct with defaults.
    pub fn new() -> FutioTuner {
        FutioTuner { template: FutioTunables::new() }
    }

    /// Set the base body-image `Tunables`.
    pub fn set_image(&mut self, image: Tunables) -> &mut FutioTuner {
        self.template.image = image;
        self
    }

    /// Set the maximum stream item buffer size in bytes, when using
    /// `SplitBodyImage`.
    pub fn set_stream_item_size(&mut self, size: usize) -> &mut FutioTuner {
        assert!(size > 0, "stream_item_size must be greater than zero");
        self.template.stream_item_size = size;
        self
    }

    /// Set the maximum initial response timeout interval.
    pub fn set_res_timeout(&mut self, dur: Duration) -> &mut FutioTuner {
        self.template.res_timeout = Some(dur);
        self
    }

    /// Unset (e.g. disable) response timeout
    pub fn unset_res_timeout(&mut self) -> &mut FutioTuner {
        self.template.res_timeout = None;
        self
    }

    /// Set the maximum streaming body timeout interval.
    pub fn set_body_timeout(&mut self, dur: Duration) -> &mut FutioTuner {
        self.template.body_timeout = Some(dur);
        self
    }

    /// Unset (e.g. disable) body timeout
    pub fn unset_body_timeout(&mut self) -> &mut FutioTuner {
        self.template.body_timeout = None;
        self
    }

    /// Set policy for blocking. Note that below the highest level interfaces
    /// such as `request_dialog` and `fetch`, setting this should be combined
    /// with using the appropriate `Stream` or `Sink` types, e.g. using
    /// `PermitBodyStream` with `BlockingPolicy::Permit`.
    pub fn set_blocking_policy(&mut self, policy: BlockingPolicy)
        -> &mut FutioTuner
    {
        self.template.blocking_policy = policy;
        self
    }

    /// Finish building, asserting any remaining invariants, and return a new
    /// `FutioTunables` instance.
    pub fn finish(&self) -> FutioTunables {
        let t = self.template.clone();
        if t.res_timeout.is_some() && t.body_timeout.is_some() {
            assert!(t.res_timeout.unwrap() <= t.body_timeout.unwrap(),
                    "res_timeout can't be greater than body_timeout");
        }
        t
    }

}

impl Default for FutioTuner {
    fn default() -> Self { FutioTuner::new() }
}