1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_PROMISE_MPSC_H
#define GRPC_SRC_CORE_LIB_PROMISE_MPSC_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <algorithm>
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/wait_set.h"
// Multi producer single consumer inter-activity comms.
namespace grpc_core {
namespace mpscpipe_detail {
// "Center" of the communication pipe.
// Contains sent but not received messages, and open/close state.
template <typename T>
class Center : public RefCounted<Center<T>> {
public:
// Construct the center with a maximum queue size.
explicit Center(size_t max_queued) : max_queued_(max_queued) {}
// Poll for new items.
// - Returns true if new items were obtained, in which case they are contained
// in dest in the order they were added. Wakes up all pending senders since
// there will now be space to send.
// - If no new items are available, returns
// false and sets up a waker to be awoken when more items are available.
// TODO(ctiller): consider the problem of thundering herds here. There may be
// more senders than there are queue spots, and so the strategy of waking up
// all senders is ill-advised.
// That said, some senders may have been cancelled by the time we wake them,
// and so waking a subset could cause starvation.
bool PollReceiveBatch(std::vector<T>& dest) {
ReleasableMutexLock lock(&mu_);
if (queue_.empty()) {
receive_waker_ = Activity::current()->MakeNonOwningWaker();
return false;
}
dest.swap(queue_);
queue_.clear();
auto wakeups = send_wakers_.TakeWakeupSet();
lock.Release();
wakeups.Wakeup();
return true;
}
// Poll to send one item.
// Returns pending if no send slot was available.
// Returns true if the item was sent.
// Returns false if the receiver has been closed.
Poll<bool> PollSend(T& t) {
ReleasableMutexLock lock(&mu_);
if (receiver_closed_) return Poll<bool>(false);
if (queue_.size() < max_queued_) {
queue_.push_back(std::move(t));
auto receive_waker = std::move(receive_waker_);
lock.Release();
receive_waker.Wakeup();
return Poll<bool>(true);
}
send_wakers_.AddPending(Activity::current()->MakeNonOwningWaker());
return Pending{};
}
// Mark that the receiver is closed.
void ReceiverClosed() {
MutexLock lock(&mu_);
receiver_closed_ = true;
}
private:
Mutex mu_;
const size_t max_queued_;
std::vector<T> queue_ ABSL_GUARDED_BY(mu_);
bool receiver_closed_ ABSL_GUARDED_BY(mu_) = false;
Waker receive_waker_ ABSL_GUARDED_BY(mu_);
WaitSet send_wakers_ ABSL_GUARDED_BY(mu_);
};
} // namespace mpscpipe_detail
template <typename T>
class MpscReceiver;
// Send half of an mpsc pipe.
template <typename T>
class MpscSender {
public:
MpscSender(const MpscSender&) = delete;
MpscSender& operator=(const MpscSender&) = delete;
MpscSender(MpscSender&&) noexcept = default;
MpscSender& operator=(MpscSender&&) noexcept = default;
// Return a promise that will send one item.
// Resolves to true if sent, false if the receiver was closed (and the value
// will never be successfully sent).
auto Send(T t) {
return [this, t = std::move(t)]() mutable { return center_->PollSend(t); };
}
private:
friend class MpscReceiver<T>;
explicit MpscSender(RefCountedPtr<mpscpipe_detail::Center<T>> center)
: center_(std::move(center)) {}
RefCountedPtr<mpscpipe_detail::Center<T>> center_;
};
// Receive half of an mpsc pipe.
template <typename T>
class MpscReceiver {
public:
// max_buffer_hint is the maximum number of elements we'd like to buffer.
// We half this before passing to Center so that the number there is the
// maximum number of elements that can be queued in the center of the pipe.
// The receiver also holds some of the buffered elements (up to half of them!)
// so the total outstanding is equal to max_buffer_hint (unless it's 1 in
// which case instantaneosly we may have two elements buffered).
explicit MpscReceiver(size_t max_buffer_hint)
: center_(MakeRefCounted<mpscpipe_detail::Center<T>>(
std::max(static_cast<size_t>(1), max_buffer_hint / 2))) {}
~MpscReceiver() {
if (center_ != nullptr) center_->ReceiverClosed();
}
MpscReceiver(const MpscReceiver&) = delete;
MpscReceiver& operator=(const MpscReceiver&) = delete;
// Only movable until it's first polled, and so we don't need to contend with
// a non-empty buffer during a legal move!
MpscReceiver(MpscReceiver&& other) noexcept
: center_(std::move(other.center_)) {
GPR_DEBUG_ASSERT(other.buffer_.empty());
}
MpscReceiver& operator=(MpscReceiver&& other) noexcept {
GPR_DEBUG_ASSERT(other.buffer_.empty());
center_ = std::move(other.center_);
return *this;
}
// Construct a new sender for this receiver.
MpscSender<T> MakeSender() { return MpscSender<T>(center_); }
// Return a promise that will resolve to the next item (and remove said item).
auto Next() {
return [this]() -> Poll<T> {
if (buffer_it_ != buffer_.end()) {
return Poll<T>(std::move(*buffer_it_++));
}
if (center_->PollReceiveBatch(buffer_)) {
buffer_it_ = buffer_.begin();
return Poll<T>(std::move(*buffer_it_++));
}
return Pending{};
};
}
private:
// Received items. We move out of here one by one, but don't resize the
// vector. Instead, when we run out of items, we poll the center for more -
// which swaps this buffer in for the new receive queue and clears it.
// In this way, upon hitting a steady state the queue ought to be allocation
// free.
std::vector<T> buffer_;
typename std::vector<T>::iterator buffer_it_ = buffer_.end();
RefCountedPtr<mpscpipe_detail::Center<T>> center_;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_PROMISE_MPSC_H