1use std::future::Future;
2use std::mem::MaybeUninit;
3use std::pin::Pin;
4use std::ptr;
5use std::ptr::NonNull;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering::{self, *};
8use std::sync::Mutex;
9use std::task::{Context, Poll, Waker};
10
11use crate::parker::{Parking, WeakOrdering};
12
13trait Node {
14 fn link(&mut self) -> &mut Link<Self>;
15}
16
17struct Link<N: ?Sized> {
18 next: Option<NonNull<N>>,
19 prev: Option<NonNull<N>>,
20}
21
22impl<T> Default for Link<T> {
23 fn default() -> Self {
24 Self { next: None, prev: None }
25 }
26}
27
28struct List<N: ?Sized> {
29 head: Option<NonNull<N>>,
30 tail: Option<NonNull<N>>,
31}
32
33impl<T> Default for List<T> {
34 fn default() -> Self {
35 Self { head: None, tail: None }
36 }
37}
38
39impl<T: Node> List<T> {
40 pub fn push_front(&mut self, node: &mut T) {
41 let ptr = unsafe { NonNull::new_unchecked(node as *const T as *mut T) };
42 if let Some(mut head) = self.head {
43 unsafe {
44 head.as_mut().link().prev = Some(ptr);
45 }
46 }
47 let link = node.link();
48 link.next = self.head;
49 link.prev = None;
50 self.head = Some(ptr);
51 if self.tail.is_none() {
52 self.tail = self.head;
53 }
54 }
55
56 pub fn pop_back<'a>(&mut self) -> Option<&'a mut T> {
57 let node = match self.tail {
58 None => return None,
59 Some(mut ptr) => unsafe { ptr.as_mut() },
60 };
61 self.tail = node.link().prev;
62 match self.tail {
63 None => self.head = None,
64 Some(mut ptr) => unsafe { ptr.as_mut().link().next = None },
65 }
66 Some(node)
67 }
68
69 pub fn unlink(&mut self, node: &mut T) -> bool {
70 let ptr = unsafe { NonNull::new_unchecked(node as *const T as *mut T) };
71 let link = node.link();
72
73 if let Some(mut next) = link.next {
74 unsafe { next.as_mut().link().prev = link.prev };
75 } else if self.tail == Some(ptr) {
76 self.tail = link.prev;
77 } else {
78 return false;
79 }
80
81 if let Some(mut prev) = link.prev {
82 unsafe { prev.as_mut().link().next = link.next };
83 } else if self.head == Some(ptr) {
84 self.head = link.next;
85 } else {
86 return false;
87 }
88
89 link.next = None;
90 link.prev = None;
91
92 true
93 }
94
95 pub fn is_empty(&self) -> bool {
96 self.head.is_none()
97 }
98}
99
100struct GuardedList<'a, T> {
101 empty: bool,
102 guard: &'a mut T,
103}
104
105impl<'a, T: Node> GuardedList<'a, T> {
106 pub fn new(list: List<T>, guard: &'a mut T) -> Self {
107 let ptr = unsafe { NonNull::new_unchecked(guard as *mut T) };
108 let link = guard.link();
109 if list.is_empty() {
110 link.next = Some(ptr);
111 link.prev = Some(ptr);
112 } else {
113 link.next = list.head;
114 link.prev = list.tail;
115 unsafe {
116 list.head.unwrap_unchecked().as_mut().link().prev = Some(ptr);
117 list.tail.unwrap_unchecked().as_mut().link().next = Some(ptr);
118 }
119 }
120 Self { empty: false, guard }
121 }
122
123 pub fn pop_back<'b>(&mut self) -> Option<&'b mut T> {
124 let addr = self.guard as *mut _;
125 let link = self.guard.link();
126 let last = unsafe { link.prev.unwrap_unchecked().as_mut() };
127 if ptr::addr_eq(addr, last) {
128 self.empty = true;
129 return None;
130 }
131 link.prev = last.link().prev;
132 last.link().next = unsafe { Some(NonNull::new_unchecked(addr)) };
133 Some(last)
134 }
135
136 pub fn is_empty(&self) -> bool {
137 self.empty
138 }
139}
140
141struct WaiterList<'a> {
142 list: GuardedList<'a, Waiter>,
143 round: Round,
144 notify: &'a Notify,
145}
146
147impl<'a> WaiterList<'a> {
148 pub fn new(list: GuardedList<'a, Waiter>, round: Round, notify: &'a Notify) -> Self {
149 Self { list, round, notify }
150 }
151
152 pub fn pop_back<'b>(&mut self, _lock: &mut std::sync::MutexGuard<'_, List<Waiter>>) -> Option<&'b mut Waiter> {
153 self.list.pop_back()
154 }
155}
156
157impl Drop for WaiterList<'_> {
158 fn drop(&mut self) {
159 if self.list.is_empty() {
160 return;
161 }
162 let _lock = self.notify.list.lock().unwrap();
163 while let Some(waiter) = self.list.pop_back() {
164 waiter.notification.store(self.round.into_notification(NotificationKind::All), Release);
165 }
166 }
167}
168
169const STATUS_MASK: usize = 3usize;
170
171const ROUND_UNIT: usize = STATUS_MASK + 1;
172const ROUND_MASK: usize = !STATUS_MASK;
173
174#[derive(Copy, Clone, Debug, PartialEq)]
175struct Round(usize);
176
177impl Round {
178 const ZERO: Round = Self(0);
179
180 pub fn new() -> Self {
181 Self(ROUND_UNIT)
182 }
183
184 pub fn into_notification(self, kind: NotificationKind) -> Notification {
185 Notification { kind, round: self }
186 }
187
188 pub fn next(self) -> Self {
189 Self(self.0.wrapping_add(ROUND_UNIT))
190 }
191
192 pub fn into(self) -> usize {
193 self.0
194 }
195
196 pub fn from(i: usize) -> Self {
197 Self(i & ROUND_MASK)
198 }
199}
200
201#[derive(Clone, Copy, Debug, PartialEq)]
202struct State {
203 round: Round,
204 status: Status,
205}
206
207#[derive(Clone, Copy, Debug, PartialEq)]
208#[repr(usize)]
209enum Status {
210 Idle = 0,
211 Waiting = 1,
212 Notified = 2,
213}
214
215impl State {
216 pub fn new() -> Self {
217 Self { round: Round::new(), status: Status::Idle }
218 }
219
220 pub fn with_status(self, status: Status) -> Self {
221 Self { round: self.round, status }
222 }
223
224 pub fn with_round(self, round: Round) -> Self {
225 Self { round, status: self.status }
226 }
227
228 pub fn next_round(self) -> Self {
229 self.with_round(self.round.next())
230 }
231}
232
233struct AtomicState(AtomicUsize);
234
235impl Default for AtomicState {
236 fn default() -> Self {
237 Self::new(State::new())
238 }
239}
240
241impl AtomicState {
242 pub fn new(state: State) -> Self {
243 Self(AtomicUsize::new(state.into()))
244 }
245
246 pub fn store(&self, state: State, ordering: Ordering) {
247 self.0.store(state.into(), ordering)
248 }
249
250 pub fn load(&self, ordering: Ordering) -> State {
251 let u = self.0.load(ordering);
252 State::from(u)
253 }
254
255 pub fn compare_exchange(
256 &self,
257 current: State,
258 new: State,
259 success: Ordering,
260 failure: Ordering,
261 ) -> Result<State, State> {
262 match self.0.compare_exchange(current.into(), new.into(), success, failure) {
263 Ok(_) => Ok(current),
264 Err(updated) => Err(State::from(updated)),
265 }
266 }
267}
268
269impl From<State> for usize {
270 fn from(state: State) -> usize {
271 state.round.into() | state.status as usize
272 }
273}
274
275impl From<usize> for State {
276 fn from(i: usize) -> Self {
277 let status = i & STATUS_MASK;
278 Self { round: Round::from(i), status: unsafe { std::mem::transmute::<usize, Status>(status) } }
279 }
280}
281
282#[derive(Clone, Copy, PartialEq)]
283#[repr(usize)]
284enum NotificationKind {
285 One = 0,
286 All = 1,
287}
288
289#[derive(Clone, Copy)]
290struct Notification {
291 kind: NotificationKind,
292 round: Round,
293}
294
295impl From<Notification> for usize {
296 fn from(notification: Notification) -> usize {
297 notification.round.into() | notification.kind as usize
298 }
299}
300
301impl From<usize> for Notification {
302 fn from(u: usize) -> Self {
303 let kind = u & STATUS_MASK;
304 Self { kind: unsafe { std::mem::transmute::<usize, NotificationKind>(kind) }, round: Round::from(u) }
305 }
306}
307
308#[derive(Default)]
309struct AtomicNotification(AtomicUsize);
310
311impl AtomicNotification {
312 pub fn clear(&mut self) {
313 self.0.store(0, Relaxed)
314 }
315
316 pub fn take(&mut self) -> Option<Notification> {
317 let notification = std::mem::take(self);
318 notification.load(Relaxed)
319 }
320
321 pub fn load(&self, ordering: Ordering) -> Option<Notification> {
322 match self.0.load(ordering) {
323 0 => None,
324 u => Some(Notification::from(u)),
325 }
326 }
327
328 pub fn store(&self, notification: Notification, ordering: Ordering) {
329 self.0.store(notification.into(), ordering)
330 }
331}
332
333#[derive(Default)]
359pub struct Notify {
360 list: Mutex<List<Waiter>>,
363 state: AtomicState,
364}
365
366unsafe impl Send for Notify {}
367unsafe impl Sync for Notify {}
368
369impl Notify {
370 pub fn new() -> Self {
372 Self::default()
373 }
374
375 pub fn notified(&self) -> Notified<'_> {
377 let round = self.round();
378 Notified { notify: self, stage: Stage::default(), round, waiter: Waiter::default() }
379 }
380
381 pub fn notify_one(&self) {
383 let state = self.state.load(SeqCst);
384 self.notify_one_in_round(state.round, state);
385 }
386
387 pub fn notify_all(&self) {
389 let mut state = self.state.load(SeqCst);
390 loop {
391 while state.status != Status::Waiting {
392 match self.state.compare_exchange(state, state.next_round().with_status(Status::Idle), Release, Relaxed)
393 {
394 Ok(_) => return,
395 Err(updated) => state = updated,
396 }
397 }
398 let mut list = self.list.lock().unwrap();
399 state = self.state.load(Relaxed);
400 if state.status != Status::Waiting {
401 drop(list);
402 continue;
403 }
404
405 self.state.store(state.next_round().with_status(Status::Idle), Release);
407
408 let mut guard = Waiter::default();
409 let mut wakers = WakerList::new();
410 let mut waiters =
411 WaiterList::new(GuardedList::new(std::mem::take(&mut list), &mut guard), state.round, self);
412
413 'list: loop {
414 while !wakers.is_full() {
415 let Some(waiter) = waiters.pop_back(&mut list) else {
416 break 'list;
417 };
418 let waker = unsafe { waiter.parking.unpark() };
419 waiter.notification.store(state.round.into_notification(NotificationKind::All), Release);
420 if let Some(waker) = waker {
421 wakers.push(waker)
422 }
423 }
424 drop(list);
425 wakers.wake();
426 list = self.list.lock().unwrap();
427 }
428 drop(list);
429 wakers.wake();
430 return;
431 }
432 }
433
434 fn remove(&self, waiter: &mut Waiter) {
435 let notification = match waiter.notification.load(Acquire) {
436 None => {
437 let mut list = self.list.lock().unwrap();
438 if list.unlink(waiter) && list.is_empty() {
439 let state = self.state.load(Relaxed);
440 if state.status == Status::Waiting {
441 self.state.store(state.with_status(Status::Idle), Relaxed);
442 }
443 }
444 drop(list);
445 let Some(notification) = waiter.notification.load(Relaxed) else {
447 return;
448 };
449 notification
450 },
451 Some(notification) => notification,
452 };
453 if notification.kind == NotificationKind::One {
454 self.release_notification(notification.round);
455 }
456 }
457
458 fn poll(&self, waiter: &mut Waiter, round: Round) -> Poll<Notification> {
459 let mut state = self.state.load(SeqCst);
460 let round = if round == Round::ZERO { state.round } else { round };
461 loop {
462 if state.round != round {
463 return Poll::Ready(round.into_notification(NotificationKind::All));
464 }
465 if state.status != Status::Notified {
466 break;
467 }
468 match self.state.compare_exchange(state, state.with_status(Status::Idle), Acquire, Acquire) {
470 Ok(_) => return Poll::Ready(state.round.into_notification(NotificationKind::One)),
471 Err(updated) => state = updated,
472 }
473 }
474 let mut list = self.list.lock().unwrap();
475 state = self.state.load(SeqCst);
476 loop {
477 if state.round != round {
478 drop(list);
479 return Poll::Ready(round.into_notification(NotificationKind::All));
480 }
481 match state.status {
482 Status::Waiting => break,
483 Status::Idle => {
484 match self.state.compare_exchange(state, state.with_status(Status::Waiting), Relaxed, Relaxed) {
485 Ok(_) => break,
486 Err(updated) => state = updated,
487 }
488 },
489 Status::Notified => {
490 match self.state.compare_exchange(state, state.with_status(Status::Idle), Acquire, Relaxed) {
491 Ok(_) => {
492 drop(list);
493 return Poll::Ready(state.round.into_notification(NotificationKind::One));
494 },
495 Err(updated) => state = updated,
496 }
497 },
498 }
499 }
500 list.push_front(waiter);
501 drop(list);
502 Poll::Pending
503 }
504
505 fn notify_one_in_round(&self, round: Round, mut state: State) {
506 loop {
507 loop {
508 if state.round != round {
511 return;
512 }
513 if state.status == Status::Waiting {
514 break;
515 }
516 match self.state.compare_exchange(state, state.with_status(Status::Notified), Release, Relaxed) {
518 Ok(_) => return,
519 Err(updated) => state = updated,
520 }
521 }
522 let mut list = self.list.lock().unwrap();
523 let state = self.state.load(Relaxed);
524 if state.round != round {
525 return;
526 }
527 if state.status != Status::Waiting {
528 drop(list);
529 continue;
530 }
531 let waiter = list.pop_back().unwrap();
532 let waker = unsafe { waiter.parking.unpark() };
533 waiter.notification.store(state.round.into_notification(NotificationKind::One), Release);
534 if list.is_empty() {
535 self.state.store(state.with_status(Status::Idle), Relaxed);
536 }
537 drop(list);
538 if let Some(waker) = waker {
539 waker.wake();
540 }
541 return;
542 }
543 }
544
545 fn round(&self) -> Round {
546 self.state.load(SeqCst).round
547 }
548
549 fn release_notification(&self, round: Round) {
550 let state = self.state.load(SeqCst);
551 self.notify_one_in_round(round, state);
552 }
553}
554
555struct WakerList {
556 next: usize,
557 wakers: [MaybeUninit<Waker>; 32],
558}
559
560impl WakerList {
561 pub fn new() -> Self {
562 Self { next: 0, wakers: std::array::from_fn(|_| MaybeUninit::uninit()) }
563 }
564
565 pub fn is_full(&self) -> bool {
566 self.next == self.wakers.len()
567 }
568
569 pub fn push(&mut self, waker: Waker) {
570 debug_assert!(self.next < self.wakers.len());
571 self.wakers[self.next].write(waker);
572 self.next += 1;
573 }
574
575 pub fn wake(&mut self) {
576 while self.next != 0 {
577 self.next -= 1;
578 let waker = unsafe { self.wakers[self.next].assume_init_read() };
579 waker.wake();
580 }
581 }
582}
583
584impl Drop for WakerList {
585 fn drop(&mut self) {
586 while self.next != 0 {
587 self.next -= 1;
588 unsafe {
589 self.wakers[self.next].assume_init_drop();
590 }
591 }
592 }
593}
594
595struct Waiter {
596 link: Link<Waiter>,
597 parking: Parking<WeakOrdering>,
598
599 notification: AtomicNotification,
602}
603
604impl Default for Waiter {
605 fn default() -> Self {
606 Self { link: Link::default(), parking: Parking::new(), notification: AtomicNotification::default() }
607 }
608}
609
610impl Node for Waiter {
611 fn link(&mut self) -> &mut Link<Waiter> {
612 &mut self.link
613 }
614}
615
616#[repr(usize)]
617#[derive(Default, Debug, Copy, Clone, PartialEq)]
618enum Stage {
619 #[default]
620 Init = 0,
621 Waiting = 1,
622 Finished = 2,
623}
624
625pub struct Notified<'a> {
627 notify: &'a Notify,
628
629 stage: Stage,
630 round: Round,
631
632 waiter: Waiter,
633}
634
635unsafe impl Send for Notified<'_> {}
636unsafe impl Sync for Notified<'_> {}
637
638impl<'a> Notified<'a> {
639 pub fn enable(mut self: Pin<&mut Self>) {
647 if self.stage != Stage::Init {
648 return;
649 }
650 let round = self.round;
651 if let Poll::Ready(notification) = self.notify.poll(&mut self.waiter, round) {
652 self.stage = Stage::Finished;
653 self.waiter.notification.store(notification, Relaxed);
654 } else {
655 self.stage = Stage::Waiting;
656 }
657 }
658
659 pub fn detach(mut self) -> Notified<'a> {
662 self.round = Round::ZERO;
663 self
664 }
665}
666
667impl Future for Notified<'_> {
668 type Output = ();
669
670 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
671 let round = self.round;
672 match self.stage {
673 Stage::Init => match self.notify.poll(&mut self.waiter, round) {
674 Poll::Pending => self.stage = Stage::Waiting,
675 Poll::Ready(_) => {
676 self.stage = Stage::Finished;
677 return Poll::Ready(());
678 },
679 },
680 Stage::Waiting => match self.waiter.notification.load(Acquire) {
681 None => {},
682 Some(_) => {
683 self.waiter.notification.clear();
684 self.stage = Stage::Finished;
685 return Poll::Ready(());
686 },
687 },
688 Stage::Finished => {
689 self.waiter.notification.clear();
691 return Poll::Ready(());
692 },
693 }
694 debug_assert_eq!(self.stage, Stage::Waiting);
695 if unsafe { self.waiter.parking.park(cx.waker()).is_ready() } {
696 while self.waiter.notification.load(Acquire).is_none() {
697 std::hint::spin_loop();
698 }
699 self.waiter.notification.clear();
700 self.stage = Stage::Finished;
701 return Poll::Ready(());
702 }
703 Poll::Pending
704 }
705}
706
707impl Drop for Notified<'_> {
708 fn drop(&mut self) {
709 match self.stage {
710 Stage::Init => {},
711 Stage::Waiting => self.notify.remove(&mut self.waiter),
712 Stage::Finished => {
713 if let Some(Notification { round, kind: NotificationKind::One }) = self.waiter.notification.take() {
714 self.notify.release_notification(round);
715 }
716 },
717 };
718 }
719}
720
721#[cfg(test)]
722mod tests {
723 use std::pin::{pin, Pin};
724
725 use asyncs::select;
726
727 use super::Notify;
728
729 #[asyncs::test]
730 async fn notify_one_simple() {
731 let notify = Notify::new();
732
733 let mut notified1 = notify.notified();
735 let mut notified2 = notify.notified();
736 select! {
737 biased;
738 default => {},
739 _ = &mut notified1 => unreachable!(),
740 _ = &mut notified2 => unreachable!(),
741 }
742
743 notify.notify_one();
745
746 select! {
748 biased;
749 default => unreachable!(),
750 _ = &mut notified2 => unreachable!(),
751 _ = &mut notified1 => {}
752 }
753
754 notify.notify_one();
756 select! {
758 default => unreachable!(),
759 _ = &mut notified2 => {},
760 }
761 }
762
763 #[asyncs::test]
764 async fn notify_one_enabled() {
765 let notify = Notify::new();
766 let notified1 = notify.notified();
767 let mut notified1 = pin!(notified1);
768 let mut notified2 = notify.notified();
769
770 notified1.as_mut().enable();
772 select! {
773 default => {},
774 _ = &mut notified2 => unreachable!(),
775 }
776
777 notify.notify_one();
779
780 notified1.await;
782
783 select! {
784 default => {},
785 _ = &mut notified2 => unreachable!(),
786 }
787 }
788
789 #[asyncs::test]
790 async fn notify_one_permit_does_not_acculumate() {
791 let notify = Notify::new();
792
793 let notified1 = notify.notified();
795 let notified2 = notify.notified();
796
797 notify.notify_one();
799 notify.notify_one();
800
801 select! {
803 default => unreachable!(),
804 _ = notified1 => {},
805 };
806 select! {
807 default => {},
808 _ = notified2 => unreachable!(),
809 };
810 }
811
812 #[asyncs::test]
813 async fn notify_one_permit_consumed_by_poll() {
814 let notify = Notify::new();
815 let mut notified1 = notify.notified();
816 let notified2 = notify.notified();
817
818 notify.notify_one();
820
821 select! {
823 default => unreachable!(),
824 _ = &mut notified1 => {},
825 };
826 drop(notified1);
827
828 select! {
830 default => {},
831 _ = notified2 => unreachable!(),
832 };
833 }
834
835 #[asyncs::test]
836 async fn notify_one_permit_doesnot_consumed_by_enable() {
837 let notify = Notify::new();
838 let mut notified1 = notify.notified();
839 let notified2 = notify.notified();
840
841 notify.notify_one();
843
844 unsafe {
846 Pin::new_unchecked(&mut notified1).enable();
847 }
848 drop(notified1);
849
850 select! {
852 default => unreachable!(),
853 _ = notified2 => {},
854 };
855 }
856
857 #[asyncs::test]
858 async fn notify_one_permit_unconsumed_resumed_on_drop() {
859 let notify = Notify::new();
860
861 let mut notified1 = notify.notified();
863 select! {
864 default => {},
865 _ = &mut notified1 => unreachable!(),
866 };
867
868 notify.notify_one();
870 drop(notified1);
871
872 let notified2 = notify.notified();
874 select! {
875 default => unreachable!(),
876 _ = notified2 => {},
877 };
878 }
879
880 #[asyncs::test]
881 async fn notify_one_permit_does_not_resumed_cross_round() {
882 let notify = Notify::new();
883
884 let mut notified1 = notify.notified();
886 select! {
887 default => {},
888 _ = &mut notified1 => unreachable!(),
889 };
890
891 notify.notify_one();
893 notify.notify_all();
894 drop(notified1);
895
896 let notified2 = notify.notified();
898 select! {
899 default => {},
900 _ = notified2 => unreachable!(),
901 };
902 }
903
904 #[asyncs::test]
905 async fn notify_all_simple() {
906 let notify = Notify::new();
907
908 let mut notified1 = notify.notified().detach();
910 let mut notified2 = notify.notified().detach();
911 let mut notified3 = notify.notified();
912
913 notify.notify_all();
915
916 select! {
918 biased;
920 default => unreachable!(),
921 _ = &mut notified1 => unreachable!("not ready"),
922 _ = &mut notified2 => unreachable!("not ready"),
923 _ = &mut notified3 => {},
924 };
925
926 notify.notify_all();
929
930 select! {
932 default => unreachable!(),
933 _ = &mut notified1 => {},
934 };
935
936 select! {
937 default => unreachable!(),
938 _ = &mut notified2 => {},
939 };
940 }
941
942 #[asyncs::test]
943 async fn notify_all_enabled() {
944 let notify = Notify::new();
945 let notified = notify.notified();
946
947 let mut notified = pin!(notified);
949 notified.as_mut().enable();
950
951 notify.notify_all();
953
954 select! {
956 default => unreachable!(),
957 _ = notified => {},
958 };
959 }
960
961 #[asyncs::test]
962 async fn notify_all_ruin_permit() {
963 let notify = Notify::new();
964
965 let notified = notify.notified().detach();
967
968 notify.notify_one();
970 notify.notify_all();
971
972 select! {
974 default => {},
975 _ = notified => unreachable!(),
976 }
977 }
978
979 #[asyncs::test]
980 async fn notify_unlink() {
981 let notify = Notify::new();
982
983 let mut notified1 = notify.notified();
984 let mut notified2 = notify.notified();
985
986 select! {
987 default => {},
988 _ = &mut notified1 => unreachable!(),
989 _ = &mut notified2 => unreachable!(),
990 }
991
992 let mut notified3 = notify.notified();
993 unsafe { Pin::new_unchecked(&mut notified3).enable() };
994
995 unsafe {
996 std::ptr::drop_in_place(&mut notified1);
997 }
998 unsafe {
999 std::ptr::drop_in_place(&mut notified2);
1000 }
1001 unsafe {
1002 std::ptr::drop_in_place(&mut notified3);
1003 }
1004
1005 std::mem::forget(notified1);
1006 std::mem::forget(notified2);
1007 std::mem::forget(notified3);
1008
1009 notify.notify_all();
1010 }
1011}