1use std::collections::HashMap;
39use std::fmt::Debug;
40use std::future::Future;
41use std::marker::PhantomData;
42use std::pin::Pin;
43use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
44use std::sync::{Arc, Mutex, RwLock};
45use std::task::{Context, Poll, ready};
46
47use futures::future::Either;
48use pin_project::{pin_project, pinned_drop};
49use tokio::runtime::Handle;
50use tokio::sync::Notify;
51use tracing::{debug, error};
52
53pub use super::errors::SingleflightError;
54use crate::error_printer::ErrorPrinter;
55
56type SingleflightResult<T, E> = Result<T, SingleflightError<E>>;
57type CallMap<T, E> = HashMap<String, Arc<Call<T, E>>>;
58type CallCreate<'a, T, E> = (Arc<Call<T, E>>, CreateGuard<'a, T, E>);
59
60pub trait ResultType: Send + Clone + Sync + Debug {}
67impl<T: Send + Clone + Sync + Debug> ResultType for T {}
68
69pub trait ResultError: Send + Debug + Sync {}
73impl<E: Send + Debug + Sync> ResultError for E {}
74
75pub trait TaskFuture<T, E>: Future<Output = Result<T, E>> + Send {}
79impl<T, E, F: Future<Output = Result<T, E>> + Send> TaskFuture<T, E> for F {}
80
81#[derive(Debug, Clone)]
90struct Call<T, E>
91where
92 T: ResultType,
93 E: ResultError,
94{
95 nt: Arc<Notify>,
97
98 res: Arc<RwLock<Option<SingleflightResult<T, E>>>>,
111
112 num_waiters: Arc<AtomicU16>,
114}
115
116impl<T, E> Call<T, E>
117where
118 T: ResultType,
119 E: ResultError,
120{
121 fn new() -> Self {
122 Self {
123 nt: Arc::new(Notify::new()),
124 res: Arc::new(RwLock::new(None)),
125 num_waiters: Arc::new(AtomicU16::new(0)),
126 }
127 }
128
129 fn complete(&self, res: SingleflightResult<T, E>) {
132 let mut val = self.res.write().unwrap();
134 *val = Some(res);
135 self.nt.notify_waiters();
136 let num_waiters = self.num_waiters.load(Ordering::SeqCst);
137 debug!("Completed Call with: {} waiters", num_waiters);
138 }
139
140 fn get_future(&self) -> impl Future<Output = SingleflightResult<T, E>> + '_ {
143 let res = self.res.read().unwrap();
145 if let Some(result) = res.clone() {
146 debug!("Call already completed");
148 Either::Left(async move { result })
149 } else {
150 self.num_waiters.fetch_add(1, Ordering::SeqCst);
152 debug!("Adding to Call's Notify");
153
154 let notified = self.nt.notified();
158 Either::Right(async move {
159 notified.await;
160 self.get()
161 })
162 }
163 }
164
165 fn get(&self) -> SingleflightResult<T, E> {
168 let res = self.res.read().unwrap();
169 res.clone().unwrap_or(Err(SingleflightError::NoResult))
170 }
171}
172
173#[derive(Debug)]
176pub struct Group<T, E>
177where
178 T: ResultType + 'static,
179 E: ResultError,
180{
181 call_map: Arc<Mutex<CallMap<T, E>>>,
182 _marker: PhantomData<fn(E)>,
183}
184
185impl<T, E: 'static> Default for Group<T, E>
186where
187 T: ResultType + 'static,
188 E: ResultError,
189{
190 fn default() -> Self {
191 Self {
192 call_map: Arc::new(Default::default()),
193 _marker: Default::default(),
194 }
195 }
196}
197
198impl<T, E: 'static> Group<T, E>
199where
200 T: ResultType + 'static,
201 E: ResultError,
202{
203 pub fn new() -> Group<T, E> {
205 Self::default()
206 }
207
208 pub async fn work(
219 &self,
220 key: &str,
221 fut: impl TaskFuture<T, E> + 'static,
222 ) -> (Result<T, SingleflightError<E>>, bool) {
223 let (call, create_guard) = match self.get_call_or_create(key) {
225 Ok((call, create_guard)) => (call, create_guard),
226 Err(err) => return (Err(err), false),
227 };
228 match &create_guard {
230 CreateGuard::Owned(_, _) => {
231 let owner_task = OwnerTask::new(fut, call.clone());
233 let owner_handle = Handle::current().spawn(owner_task);
234
235 match owner_handle.await {
237 Ok(res) => (res, true),
238 Err(e) => (Err(SingleflightError::JoinError(e.to_string())), true),
239 }
240 },
241 CreateGuard::Waiter => (call.get_future().await, false),
242 }
243 }
244
245 pub async fn work_dump_caller_info(
247 &self,
248 key: &str,
249 fut: impl TaskFuture<T, E> + 'static,
250 ) -> Result<T, SingleflightError<E>> {
251 let (result, _) = self.work(key, fut).await;
252 result
253 }
254
255 fn get_call_or_create<'a>(&'a self, key: &'a str) -> Result<CallCreate<'a, T, E>, SingleflightError<E>> {
263 let mut m = self
264 .call_map
265 .lock()
266 .log_error("Failed to lock call map")
267 .map_err(|_| SingleflightError::GroupLockPoisoned)?;
268 if let Some(c) = m.get(key).cloned() {
269 Ok((c, CreateGuard::Waiter))
270 } else {
271 let c = Arc::new(Call::new());
272 let our_call = c.clone();
273 m.insert(key.to_owned(), c);
274 Ok((our_call, CreateGuard::Owned(self, key)))
275 }
276 }
277
278 fn remove_call(&self, key: &str) -> SingleflightResult<(), E> {
281 let mut m = self
282 .call_map
283 .lock()
284 .log_error("Failed to lock call map")
285 .map_err(|_| SingleflightError::GroupLockPoisoned)?;
286 m.remove(key).ok_or(SingleflightError::CallMissing)?;
287 Ok(())
288 }
289}
290
291enum CreateGuard<'a, T, E>
295where
296 T: ResultType + 'static,
297 E: ResultError + 'static,
298{
299 Owned(&'a Group<T, E>, &'a str),
300 Waiter,
301}
302
303impl<T, E> Drop for CreateGuard<'_, T, E>
304where
305 T: ResultType + 'static,
306 E: ResultError + 'static,
307{
308 fn drop(&mut self) {
309 match self {
310 CreateGuard::Owned(group, key) => group
311 .remove_call(key)
312 .inspect_err(|err| error!(?err, "Couldn't remove call from map"))
313 .unwrap(),
314 CreateGuard::Waiter => {},
315 }
316 }
317}
318
319#[pin_project(PinnedDrop)]
331#[must_use = "futures do nothing unless you `.await` or poll them"]
332struct OwnerTask<T, E, F>
333where
334 T: ResultType,
335 E: ResultError,
336 F: TaskFuture<T, E>,
337{
338 #[pin]
339 fut: F,
340 got_response: AtomicBool,
341 call: Arc<Call<T, E>>,
342}
343
344impl<T, E, F> OwnerTask<T, E, F>
345where
346 T: ResultType,
347 E: ResultError,
348 F: TaskFuture<T, E>,
349{
350 fn new(fut: F, call: Arc<Call<T, E>>) -> Self {
351 Self {
352 fut,
353 got_response: AtomicBool::new(false),
354 call,
355 }
356 }
357}
358
359impl<T, E, F> Future for OwnerTask<T, E, F>
360where
361 T: ResultType,
362 E: ResultError,
363 F: TaskFuture<T, E>,
364{
365 type Output = Result<T, SingleflightError<E>>;
366
367 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
368 let this = self.project();
369 let res: Result<T, E> = ready!(this.fut.poll(cx));
370 let res = res.map_err(|e| SingleflightError::InternalError(e));
371 let call = this.call;
373 this.got_response.store(true, Ordering::SeqCst);
374 call.complete(res.clone());
375 Poll::Ready(res)
376 }
377}
378
379#[pinned_drop]
380impl<T, E, F> PinnedDrop for OwnerTask<T, E, F>
381where
382 T: ResultType,
383 E: ResultError,
384 F: TaskFuture<T, E>,
385{
386 fn drop(self: Pin<&mut Self>) {
387 let this = self.project();
390 if !this.got_response.load(Ordering::SeqCst) {
391 let call = this.call;
392 call.complete(Err(SingleflightError::OwnerPanicked));
393 }
394 }
395}
396
397#[cfg(test)]
398pub(crate) mod tests {
399 use std::sync::Arc;
400 use std::sync::atomic::{AtomicU32, Ordering};
401 use std::time::Duration;
402
403 use futures::future::join_all;
404 use tokio::runtime::Handle;
405 use tokio::task::JoinHandle;
406 use tokio::time::timeout;
407
408 use super::super::errors::SingleflightError;
409 use super::{Call, Group, OwnerTask};
410 use crate::core::XetRuntime;
411
412 pub(crate) const WAITER_TIMEOUT: Duration = Duration::from_millis(100);
417
418 const RES: usize = 7;
419
420 async fn return_res() -> Result<usize, ()> {
421 Ok(RES)
422 }
423
424 async fn expensive_fn(x: Arc<AtomicU32>, resp: usize) -> Result<usize, ()> {
425 tokio::time::sleep(Duration::new(1, 0)).await;
426 x.fetch_add(1, Ordering::SeqCst);
427 Ok(resp)
428 }
429
430 #[test]
431 fn test_simple_with_threadpool() {
432 let threadpool = Arc::new(XetRuntime::new().unwrap());
433 let g = Group::new();
434 let res = threadpool
435 .bridge_sync(async move { g.work("key", return_res()).await })
436 .unwrap()
437 .0;
438 let r = res.unwrap();
439 assert_eq!(r, RES);
440 }
441
442 #[tokio::test]
443 async fn test_simple() {
444 let g = Group::new();
445 let res = g.work("key", return_res()).await.0;
446 let r = res.unwrap();
447 assert_eq!(r, RES);
448 }
449
450 #[test]
451 #[cfg_attr(feature = "smoke-test", ignore)]
452 fn test_multiple_threads_with_threadpool() {
453 let times_called = Arc::new(AtomicU32::new(0));
454 let threadpool = Arc::new(XetRuntime::new().unwrap());
455 let g: Arc<Group<usize, ()>> = Arc::new(Group::new());
456 let mut handlers: Vec<JoinHandle<(usize, bool)>> = Vec::new();
457 let threadpool_ = threadpool.clone();
458 let tasks = async move {
459 for _ in 0..10 {
460 let g = g.clone();
461 let counter = times_called.clone();
462 handlers.push(threadpool_.spawn(async move {
463 let tup = g.work("key", expensive_fn(counter, RES)).await;
464 let res = tup.0;
465 let fn_response = res.unwrap();
466 (fn_response, tup.1)
467 }));
468 }
469
470 let num_callers = join_all(handlers)
471 .await
472 .into_iter()
473 .map(|r| r.unwrap())
474 .filter(|(val, is_caller)| {
475 assert_eq!(*val, RES);
476 *is_caller
477 })
478 .count();
479 assert_eq!(1, num_callers);
480 assert_eq!(1, times_called.load(Ordering::SeqCst));
481 };
482 threadpool.bridge_sync(tasks).unwrap();
483 }
484
485 #[tokio::test]
486 #[cfg_attr(feature = "smoke-test", ignore)]
487 async fn test_multiple_threads() {
488 let times_called = Arc::new(AtomicU32::new(0));
489 let g: Arc<Group<usize, ()>> = Arc::new(Group::new());
490 let mut handlers: Vec<JoinHandle<(usize, bool)>> = Vec::new();
491 for _ in 0..10 {
492 let g = g.clone();
493 let counter = times_called.clone();
494 handlers.push(Handle::current().spawn(async move {
495 let tup = g.work("key", expensive_fn(counter, RES)).await;
496 let res = tup.0;
497 let fn_response = res.unwrap();
498 (fn_response, tup.1)
499 }));
500 }
501
502 let num_callers = join_all(handlers)
503 .await
504 .into_iter()
505 .map(|r| r.unwrap())
506 .filter(|(val, is_caller)| {
507 assert_eq!(*val, RES);
508 *is_caller
509 })
510 .count();
511 assert_eq!(1, num_callers);
512 assert_eq!(1, times_called.load(Ordering::SeqCst));
513 }
514
515 #[tokio::test]
516 #[cfg_attr(feature = "smoke-test", ignore)]
517 async fn test_error() {
518 let times_called = Arc::new(AtomicU32::new(0));
519
520 async fn expensive_error_fn(x: Arc<AtomicU32>) -> Result<usize, &'static str> {
521 tokio::time::sleep(Duration::new(1, 500)).await;
522 x.fetch_add(1, Ordering::SeqCst);
523 Err("Error")
524 }
525
526 let g: Arc<Group<usize, &'static str>> = Arc::new(Group::new());
527 let mut handlers = Vec::new();
528
529 for _ in 0..10 {
530 let g = g.clone();
531 let counter = times_called.clone();
532 handlers.push(Handle::current().spawn(async move {
533 let tup = g.work("key", expensive_error_fn(counter)).await;
534 let res = tup.0;
535 assert!(res.is_err());
536 tup.1
537 }));
538 }
539
540 let num_callers = join_all(handlers).await.into_iter().map(|r| r.unwrap()).filter(|b| *b).count();
541 assert_eq!(1, num_callers);
542 assert_eq!(1, times_called.load(Ordering::SeqCst));
543 }
544
545 #[tokio::test]
546 #[cfg_attr(feature = "smoke-test", ignore)]
547 async fn test_multiple_keys() {
548 let times_called_x = Arc::new(AtomicU32::new(0));
549 let times_called_y = Arc::new(AtomicU32::new(0));
550
551 let mut handlers1 = call_success_n_times(5, "key", times_called_x.clone(), 7);
552 let mut handlers2 = call_success_n_times(5, "key2", times_called_y.clone(), 13);
553 handlers1.append(&mut handlers2);
554 let count_x = AtomicU32::new(0);
555 let count_y = AtomicU32::new(0);
556
557 let num_callers = join_all(handlers1)
558 .await
559 .into_iter()
560 .map(|r| r.unwrap())
561 .filter(|(val, is_caller)| {
562 if *val == 7 {
563 count_x.fetch_add(1, Ordering::SeqCst);
564 } else if *val == 13 {
565 count_y.fetch_add(1, Ordering::SeqCst);
566 } else {
567 panic!("joined a number not expected: {}", *val);
568 }
569 *is_caller
570 })
571 .count();
572 assert_eq!(2, num_callers);
573 assert_eq!(5, count_x.load(Ordering::SeqCst));
574 assert_eq!(5, count_y.load(Ordering::SeqCst));
575 assert_eq!(1, times_called_x.load(Ordering::SeqCst));
576 assert_eq!(1, times_called_y.load(Ordering::SeqCst));
577 }
578
579 fn call_success_n_times(times: usize, key: &str, c: Arc<AtomicU32>, val: usize) -> Vec<JoinHandle<(usize, bool)>> {
581 let g: Arc<Group<usize, ()>> = Arc::new(Group::new());
582 let mut handlers = Vec::new();
583 for _ in 0..times {
584 let g = g.clone();
585 let counter = c.clone();
586 let k = key.to_owned();
587 handlers.push(Handle::current().spawn(async move {
588 let tup = g.work(k.as_str(), expensive_fn(counter, val)).await;
589 let res = tup.0;
590 let fn_response = res.unwrap();
591 (fn_response, tup.1)
592 }));
593 }
594 handlers
595 }
596
597 #[tokio::test]
598 async fn test_owner_task_future_impl() {
599 const VAL: i32 = 10;
600 let future = async { Ok::<i32, String>(VAL) };
601 let call = Arc::new(Call::new());
602 let owner_task = OwnerTask::new(future, call.clone());
603 let result = tokio::spawn(owner_task).await;
604 assert_eq!(VAL, result.unwrap().unwrap());
605 assert_eq!(VAL, call.get().unwrap());
606 }
607
608 #[tokio::test]
609 async fn test_owner_task_future_notify() {
610 const VAL: i32 = 10;
611 let future = async { Ok::<i32, String>(VAL) };
612 let call = Arc::new(Call::new());
613 let call_waiter = call.clone();
614 let waiter_task = async move {
615 let waiter_future = call_waiter.get_future();
616 assert_eq!(VAL, waiter_future.await.unwrap());
617 };
618 let waiter_handle = tokio::spawn(waiter_task);
619 let owner_task = OwnerTask::new(future, call.clone());
620 let result = tokio::spawn(owner_task).await;
621 timeout(WAITER_TIMEOUT, waiter_handle).await.unwrap().unwrap();
622 assert_eq!(VAL, result.unwrap().unwrap());
623 assert_eq!(VAL, call.get().unwrap());
624 assert_eq!(1, call.num_waiters.load(Ordering::SeqCst)) }
626
627 #[tokio::test]
628 async fn test_owner_task_future_panic() {
629 let future = async { panic!("failing task") };
630 let call = Arc::new(Call::<i32, String>::new());
631 let call_waiter = call.clone();
632 let waiter_task = async move {
633 let waiter_future = call_waiter.get_future();
634 let result = waiter_future.await;
635 assert!(matches!(result, Err(SingleflightError::OwnerPanicked)));
636 };
637 let waiter_handle = tokio::spawn(waiter_task);
638
639 let owner_task = OwnerTask::new(future, call.clone());
640 let result = tokio::spawn(owner_task).await;
641 assert!(result.is_err());
642 timeout(WAITER_TIMEOUT, waiter_handle).await.unwrap().unwrap();
643 assert_eq!(1, call.num_waiters.load(Ordering::SeqCst)) }
645}
646
647#[cfg(test)]
648mod test_deadlock {
649 use std::collections::HashMap;
650 use std::sync::Arc;
651
652 use futures::StreamExt;
653 use futures::stream::iter;
654 use tests::WAITER_TIMEOUT;
655 use tokio::runtime::Handle;
656 use tokio::sync::mpsc::error::SendError;
657 use tokio::sync::mpsc::{Sender, channel};
658 use tokio::sync::{Mutex, Notify};
659 use tokio::time::timeout;
660
661 use super::{Group, tests};
662
663 #[tokio::test]
664 async fn test_deadlock() {
665 let group: Arc<Group<usize, ()>> = Arc::new(Group::new());
680 let (send1, mut recv1) = channel::<usize>(1);
682 let (send2, mut recv2) = channel::<usize>(1);
683 let vals1: Vec<usize> = vec![1, 2, 3, 4, SHARED_ITEM];
685 let vals2: Vec<usize> = vec![6, 7, SHARED_ITEM, 8, 9];
686
687 let waiters: Arc<Mutex<HashMap<usize, Arc<Notify>>>> = Arc::new(Mutex::new(HashMap::new()));
692 {
693 let mut guard = waiters.lock().await;
694 guard.insert(vals2[1], Arc::new(Notify::new()));
695 guard.insert(SHARED_ITEM, Arc::new(Notify::new()));
696 }
697
698 let t1 = Handle::current().spawn(run_task(1, group.clone(), waiters.clone(), send1, false, vals1.clone()));
700 let t2 = Handle::current().spawn(run_task(2, group.clone(), waiters.clone(), send2, true, vals2.clone()));
701
702 for (i, expected_val) in vals1.into_iter().enumerate() {
704 if i == 3 {
705 println!("[main] notifying val: {}", vals2[1]);
707 let guard = waiters.lock().await;
708 guard.get(&vals2[1]).unwrap().notify_one();
709 println!("[main] notified val: {}", vals2[1])
710 }
711 if i == 4 {
712 println!("[main] notifying val: {}", SHARED_ITEM);
714 let guard = waiters.lock().await;
715 guard.get(&SHARED_ITEM).unwrap().notify_one();
716 println!("[main] notified val: {}", SHARED_ITEM);
717 }
718 println!("[main] getting t1[{}]", i);
719 let res = timeout(WAITER_TIMEOUT, recv1.recv())
720 .await
721 .map_err(|_| format!("Timed out on task1 waiting for val: {}. Likely deadlock.", expected_val));
722 let val = res.unwrap().unwrap();
723 println!("[main] got val: {} from t1[{}]", val, i);
724 assert_eq!(expected_val, val);
725 }
726
727 for expected_val in vals2 {
729 let res = timeout(WAITER_TIMEOUT, recv2.recv())
730 .await
731 .map_err(|_| format!("Timed out on task2 waiting for val: {}. Likely deadlock.", expected_val));
732 let val = res.unwrap().unwrap();
733 assert_eq!(expected_val, val);
734 }
735
736 t1.await.unwrap().unwrap();
738 t2.await.unwrap().unwrap();
739 }
740
741 const SHARED_ITEM: usize = 5;
742
743 async fn run_task(
744 id: i32,
745 g: Arc<Group<usize, ()>>,
746 waiters: Arc<Mutex<HashMap<usize, Arc<Notify>>>>,
747 send_chan: Sender<usize>,
748 should_own: bool,
749 vals: Vec<usize>,
750 ) -> Result<(), SendError<usize>> {
751 let mut strm = iter(vals.into_iter().map(|v| {
753 let g = g.clone();
754 let waiters = waiters.clone();
755 async move {
757 println!("[task: {}] running task for: {}", id, v);
758 let (res, is_owner) = g.work(format!("{}", v).as_str(), run_fut(v, waiters)).await;
759 println!("[task: {}] completed task for: {}, is_owner: {}", id, v, is_owner);
760 if v == SHARED_ITEM {
761 assert_eq!(should_own, is_owner);
762 }
763 res.unwrap()
764 }
765 }))
766 .buffered(3);
767
768 while let Some(val) = strm.next().await {
769 println!("[task: {}] sending next element: {}", id, val);
770 send_chan.send(val).await?;
771 println!("[task: {}] sent next element: {}", id, val);
772 }
773 println!("[task: {}] done executing", id);
774 Ok(())
775 }
776
777 async fn run_fut(v: usize, waiters: Arc<Mutex<HashMap<usize, Arc<Notify>>>>) -> Result<usize, ()> {
778 let waiter = {
779 let x = waiters.lock().await;
780 x.get(&v).cloned()
781 };
782 if let Some(waiter) = waiter {
784 println!("val: {}, waiting for signal", v);
785 waiter.notified().await;
786 println!("val: {}, woke up from signal", v);
787 }
788 Ok(v)
789 }
790}
791
792#[cfg(test)]
793mod test_futures_unordered {
794 use std::future::Future;
795 use std::pin::Pin;
796 use std::sync::Arc;
797 use std::time::Duration;
798
799 use futures_util::TryStreamExt;
800 use futures_util::stream::FuturesUnordered;
801 use tokio::sync::mpsc;
802 use tokio::time::sleep;
803
804 use super::super::errors::SingleflightError;
805 use super::Group;
806
807 type FutType = Pin<Box<dyn Future<Output = Result<(i32, bool), SingleflightError<String>>> + Send>>;
808
809 #[tokio::test]
810 async fn test_dropped_owner() {
811 let group = Arc::new(Group::new());
828
829 let (ready_tx, mut ready_rx) = mpsc::channel(1);
831 let (done_tx, mut done_rx) = mpsc::channel(1);
834
835 let fut_owner = get_fut(group.clone(), "key1", async move {
837 ready_tx.send(true).await.unwrap();
838 sleep(Duration::from_millis(100)).await;
839 done_tx.send(true).await.unwrap();
840 Ok(1)
841 });
842 let fut_waiter =
844 get_fut(group.clone(), "key1", async { Err("Test BUG: waiter should not be called".to_string()) });
845
846 let fut_err = get_fut(group.clone(), "key2", async { Err("failed".to_string()) });
848
849 let handle = tokio::spawn(async move {
851 assert!(ready_rx.recv().await.unwrap());
852 let (i, is_owner) = fut_waiter.await.unwrap();
853 assert!(!is_owner);
854 assert_eq!(i, 1);
855 });
856
857 let futures: Result<Vec<(i32, bool)>, SingleflightError<String>> =
864 FuturesUnordered::from_iter(vec![fut_owner, fut_err]).try_collect().await;
865
866 assert!(futures.is_err());
867 assert!(!group.call_map.lock().unwrap().contains_key("key1"));
869 assert!(done_rx.recv().await.unwrap());
870 handle.await.unwrap();
871
872 let fut_after = get_fut(group, "key1", async { Ok(5) });
875 let (i, is_owner) = fut_after.await.unwrap();
876 assert!(is_owner);
877 assert_eq!(i, 5);
878 }
879
880 fn get_fut(
881 g: Arc<Group<i32, String>>,
882 key: &str,
883 f: impl Future<Output = Result<i32, String>> + Send + 'static,
884 ) -> FutType {
885 let key = key.to_string();
886 Box::pin(async move {
887 let (res, is_owner) = g.work(&key, f).await;
888 let i = res?;
889 Ok((i, is_owner))
890 })
891 }
892}