1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826
#![doc(html_root_url = "https://docs.rs/corona/0.2.1/corona/")] //! A library combining futures and coroutines. //! //! The current aim of Rust in regards to asynchronous programming is on //! [`futures`](https://crates.io/crates/futures). They have many good properties. However, they //! tend to result in very functional-looking code (eg. monadic chaining of closures through their //! modifiers). Some tasks are more conveniently done throuch imperative approaches. //! //! The aim of this library is to integrate coroutines with futures. It is possible to start a //! coroutine. The coroutine can wait for a future to complete (which'll suspend its execution, but //! will not block the thread ‒ the execution will switch to other futures or coroutines on the //! same thread). A spawned coroutine is represented through a handle that acts as a future, //! representing its completion. Therefore, other coroutines can wait for it, or it can be used in //! the usual functional way and compose it with other futures. //! //! Unlike the coroutines planned for core Rust, these coroutines are stack-full (eg. you can //! suspend them from within deep stack frame) and they are available now. //! //! # The cost //! //! The coroutines are *not* zero cost, at least not now. There are these costs: //! //! * Each coroutine needs a stack. The stack takes some space and is allocated dynamically. //! Furthermore, a stack can't be allocated through the usual allocator, but is mapped directly //! by the OS and manipulating the memory mapping of pages is relatively expensive operation (a //! syscall, TLB needs to be flushed, ...). The stacks are cached and reused, but still, creating //! them has a cost. //! * Each wait for a future inside a coroutine allocates dynamically (because it spawns a task //! inside [Tokio's](https://crates.io/crates/tokio-core) reactor core. //! //! Some of these costs might be mitigated or lowered in future, but for now, expect to get //! somewhat lower performance with coroutines compared to using only futures. //! //! # API Stability //! //! Currently, the crate is in an experimental state. It exists mostly as a research if it is //! possible to provide something like async/await in Rust and integrate it with the current //! asynchronous stack, without adding explicit support to the language. //! //! It is obviously possible, but the ergonomics is something that needs some work. Therefore, //! expect the API to change, possibly in large ways. //! //! Still, if you want to test it, it should probably work and might be useful. It is experimental //! in a sense the API is not stabilized, but it is not expected to eat data or crash. //! //! # Known problems //! //! These are the problems I'm aware of and which I want to find a solution some day. //! //! * The current API is probably inconvinient. //! * Many abstractions are missing. Things like waiting for a future with a timeout, or waiting //! for the first of many futures or streams would come handy. //! * Many places have `'static` bounds on the types, even though these shouldn't be needed in //! theory. //! * The relation with unwind safety is unclear. //! * No support for threads (probably not even possible ‒ Rust's type system doesn't expect a //! stack to move from one thread to another). //! * It relies on the tokio. It would be great if it worked with other future executors as well. //! * Cleaning up of stacks (and things on them) when the coroutines didn't finish yet is done //! through panicking. This has some ugly side effects. //! * It is possible to create a deadlock when moving the driving tokio core inside a coroutine, //! like this (eg. this is an example what *not* to do): //! //! ```rust,no_run //! # extern crate corona; //! # extern crate futures; //! # extern crate tokio_core; //! # use std::time::Duration; //! # use corona::Coroutine; //! # use futures::Future; //! # use futures::unsync::oneshot; //! # use tokio_core::reactor::{Core, Timeout}; //! # //! # fn main() { //! let mut core = Core::new().unwrap(); //! let (sender, receiver) = oneshot::channel(); //! let handle = core.handle(); //! let c = Coroutine::with_defaults(handle.clone(), move |_await| { //! core.run(receiver).unwrap(); //! }); //! Coroutine::with_defaults(handle, |await| { //! let timeout = Timeout::new(Duration::from_millis(50), await.handle()).unwrap(); //! await.future(timeout).unwrap(); //! drop(sender.send(42)); //! }); //! c.wait().unwrap(); //! # } //! ``` //! //! # Contribution //! //! All kinds of contributions are welcome, including reporting bugs, improving the documentation, //! submitting code, etc. However, the most valuable contribution for now would be trying it out //! and providing some feedback ‒ if the thing works, where the API needs improvements, etc. //! //! # Examples //! //! One that shows the API. //! //! ``` //! # extern crate corona; //! # extern crate futures; //! # extern crate tokio_core; //! use std::time::Duration; //! use corona::Coroutine; //! use futures::Future; //! use tokio_core::reactor::{Core, Timeout}; //! //! # fn main() { //! let mut core = Core::new().unwrap(); //! let builder = Coroutine::new(core.handle()); //! let generator = builder.generator(|producer| { //! producer.produce(1); //! producer.produce(2); //! }); //! let coroutine = builder.spawn(move |await| { //! for item in await.stream(generator) { //! println!("{}", item.unwrap()); //! } //! //! let timeout = Timeout::new(Duration::from_millis(100), await.handle()).unwrap(); //! await.future(timeout).unwrap(); //! //! 42 //! }); //! assert_eq!(42, core.run(coroutine).unwrap()); //! # } //! ``` //! //! Further examples can be found in the //! [repository](https://github.com/vorner/corona/tree/master/examples). extern crate context; extern crate futures; extern crate tokio_core; use std::any::Any; use std::cell::{Cell, RefCell}; use std::ops::Deref; use std::panic::{self, AssertUnwindSafe}; use std::thread; use context::Context; use context::stack::{Stack, ProtectedFixedSizeStack}; use futures::{Future, Sink, Stream}; use futures::future; use futures::unsync::oneshot; use futures::unsync::mpsc::{self, Sender as ChannelSender}; use tokio_core::reactor::Handle; pub mod results; mod errors; mod stack_cache; mod switch; pub use errors::{Dropped, TaskFailed}; use errors::TaskResult; use results::{CoroutineResult, GeneratorResult, StreamCleanupIterator, StreamIterator}; use switch::Switch; /// An asynchronous context. /// /// This is passed to each coroutine closure and can be used to pause (or block) the coroutine, /// waiting for a future or something similar to complete. /// /// The context is explicit, for two reasons. One is, it is possible to ensure nobody tries to /// wait for a future and block outside of coroutine. The other is, it is more obvious what happens /// from the code than with some thread-local magic behind the scenes. /// /// The downside is a little bit less convenience on use. pub struct Await<'a> { context: &'a RefCell<Option<Context>>, dropped: Cell<bool>, leak_on_panic: bool, stack: &'a RefCell<Option<ProtectedFixedSizeStack>>, handle: &'a Handle, } impl<'a> Await<'a> { /// Accesses the handle to the corresponding reactor core. /// /// This is simply a convenience method, since it is possible to get the handle explicitly into /// every place where this can be used. But it is convenient not to have to pass another /// variable and the `Await` and the handle are usually used together. pub fn handle(&self) -> &Handle { self.handle } /// Blocks the current coroutine until the future resolves. /// /// This blocks or parks the current coroutine (and lets other coroutines run) until the /// provided future completes. The result of the coroutine is returned. /// /// # Notes /// /// For the switching between coroutines to work, the reactor must be running. /// /// # Panics /// /// This may panic if the reactor core is dropped before the waited-for future resolves. The /// panic is meant to unwind the coroutine's stack so all the memory can be cleaned up. /// /// # Examples /// /// ``` /// # extern crate corona; /// # extern crate futures; /// # extern crate tokio_core; /// use std::time::Duration; /// use corona::Coroutine; /// use futures::Future; /// use tokio_core::reactor::{Core, Timeout}; /// /// # fn main() { /// let mut core = Core::new().unwrap(); /// let coroutine = Coroutine::with_defaults(core.handle(), |await| { /// let timeout = Timeout::new(Duration::from_millis(100), await.handle()).unwrap(); /// await.future(timeout).unwrap(); /// }); /// core.run(coroutine).unwrap(); /// # } /// ``` pub fn future<I, E, Fut>(&self, fut: Fut) -> Result<I, E> where I: 'static, E: 'static, Fut: Future<Item = I, Error = E> + 'static, { match self.future_cleanup(fut) { Ok(result) => result, Err(Dropped) => { if self.leak_on_panic && thread::panicking() { let stack = self.stack .borrow_mut() .take() .unwrap(); self.switch(Switch::Destroy { stack }); unreachable!(); } else { panic!("Cleaning up the coroutine stack because the reactor Core got dropped"); } } } } // Switch out of the current coroutine and back fn switch(&self, switch: Switch) -> Switch { let context = self.context .borrow_mut() .take() .unwrap(); let (reply, context) = switch.exchange(context); *self.context.borrow_mut() = Some(context); reply } /// Blocks the current coroutine, just like [`future`](#method.future), but doesn't panic. /// /// This works similar to the `future` method. However, it signals if the reactor `Core` has /// been destroyed by returning `Err(Dropped)` instead of panicking. This can be used to /// manually clean up the coroutine instead of letting a panic do that. /// /// This is important especially in cases when clean shutdown is needed even when the `Core` in /// the main coroutine is destroyed during a panic, since the `future` method either causes a /// double panic (making the program abort) or doesn't do any cleanup at all, depending on the /// configuration. pub fn future_cleanup<I, E, Fut>(&self, fut: Fut) -> Result<Result<I, E>, Dropped> where I: 'static, E: 'static, Fut: Future<Item = I, Error = E> + 'static, { if self.dropped.get() { return Err(Dropped); } let (sender, receiver) = oneshot::channel(); let task = fut.then(move |r| { // Errors are uninteresting - just the listener missing // TODO: Is it even possible? drop(sender.send(r)); Ok(()) }); let switch = Switch::ScheduleWakeup { after: Box::new(task), handle: self.handle.clone(), }; match self.switch(switch) { Switch::Resume => (), Switch::Cleanup => { self.dropped.set(true); return Err(Dropped); }, _ => panic!("Invalid instruction on wakeup"), } // It is safe to .wait(), because once we are resumed, the future already went through. // It shouldn't happen that we got canceled under normal circumstances (may need API // changes to actually ensure that). Ok(receiver.wait().expect("A future should never get dropped")) } /// Blocks the current coroutine to get each element of the stream. /// /// This acts in a very similar way as the [`future`](#method.future) method. The difference is /// it acts on a stream and produces an iterator instead of a single result. Therefore it may /// (and usually will) switch the coroutines more than once. /// /// Similar notes as with the [`future`](#method.future) apply. /// /// # Panics /// /// The same ones as with the [`future`](#method.future) method. /// /// # Examples /// /// ``` /// # extern crate corona; /// # extern crate futures; /// # extern crate tokio_core; /// use corona::Coroutine; /// use futures::stream; /// use tokio_core::reactor::Core; /// /// # fn main() { /// let mut core = Core::new().unwrap(); /// let coroutine = Coroutine::with_defaults(core.handle(), |await| { /// let stream = stream::empty::<(), ()>(); /// for item in await.stream(stream) { /// // Streams can contain errors, so it iterates over `Result`s. /// let item = item.unwrap(); /// // Process them here /// } /// }); /// core.run(coroutine).unwrap(); /// # } /// ``` pub fn stream<I, E, S>(&self, stream: S) -> StreamIterator<I, E, S> where S: Stream<Item = I, Error = E> + 'static, I: 'static, E: 'static, { StreamIterator::new(self, stream) } /// Blocks the current coroutine to get each element of the stream. /// /// This is the same as the [`stream`](#method.stream) method, but it doesn't panic when the /// core is dropped and the coroutine's stack needs to be cleaned up. Instead it returns /// `Err(Dropped)` and leaves the cleanup to the caller. /// /// The advantage is this works even in case the reactor core is dropped during a panic. See /// [`Coroutine::leak_on_panic`](struct.Coroutine.html#method.leak_on_panic) for more details. pub fn stream_cleanup<I, E, S>(&self, stream: S) -> StreamCleanupIterator<I, E, S> where S: Stream<Item = I, Error = E> + 'static, I: 'static, E: 'static, { StreamCleanupIterator::new(self, stream) } /// Switches to another coroutine. /// /// This allows another coroutine to run. However, if there's no other coroutine ready to run, /// this may terminate right away and continue execution. Also, it does *not* guarantee getting /// more external events -- the other coroutines can do work on the data that is already /// received, for example, but network events will probably arrive only after the coroutine /// really waits on something or terminates. Therefore, doing CPU-intensive work in a coroutine /// and repeatedly call `yield_now` is not guaranteed to work well. Use a separate thread or /// something like the [`futures-cpupool`](https://crates.io/crates/futures-cpupool) crate to /// off-load the heavy work. pub fn yield_now(&self) { let fut = future::ok::<_, ()>(()); self.future(fut).unwrap(); } /// Switches to another coroutine. /// /// This is the same as [`yield_now`](#method.yield_now), but instead of panicking when the /// reactor core is dropped, it returns `Err(Dropped)`. pub fn yield_now_cleanup(&self) -> Result<(), Dropped> { let fut = future::ok::<_, ()>(()); self.future_cleanup(fut).map(|_| ()) } } type ItemOrPanic<I> = Result<I, Box<Any + Send + 'static>>; type ItemSender<I> = ChannelSender<ItemOrPanic<I>>; /// This is an extended [`Await`](struct.Await.html) with ability to items. /// /// Just like an ordinary coroutine returns produces a single return value when it finishes and can /// suspend its execution using the [`Await`](struct.Await.html) parameter, a generator can do all /// this and, in addition, produce a serie of items of a given type through this parameter. /// /// See [`Coroutine::generator`](struct.Coroutine.html#method.generator). pub struct Producer<'a, I: 'static> { await: &'a Await<'a>, sink: RefCell<Option<ItemSender<I>>>, } // TODO: Is this an abuse of Deref? Any better ways? impl<'a, I: 'static> Deref for Producer<'a, I> { type Target = Await<'a>; fn deref(&self) -> &Await<'a> { self.await } } impl<'a, I: 'static> Producer<'a, I> { /// Creates a new producer. /// /// While the usual way to get a producer is through the /// [`Coroutine::generator`](struct.Coroutine.html#method.generator), it is also possible to /// create one manually, from an [`Await`](struct.Await.html) and a channel sender of the right /// type. pub fn new(await: &'a Await<'a>, sink: ItemSender<I>) -> Self { Producer { await, sink: RefCell::new(Some(sink)), } } /// Pushes another value through the internal channel, effectively sending it to another /// coroutine. /// /// This takes a value and pushes it through a channel to another coroutine. It may suspend the /// execution of the current coroutine and yield to another one. /// /// The same notes and panics as with the [`future`](struct.Await.html#method.future) method /// apply. pub fn produce(&self, item: I) { let sink = self.sink.borrow_mut().take(); if let Some(sink) = sink { let future = sink.send(Ok(item)); if let Ok(s) = self.await.future(future) { *self.sink.borrow_mut() = Some(s); } } } } /// A builder of coroutines. /// /// This struct is the main entry point and a way to start coroutines of various kinds. It allows /// both starting them with default parameters and configuring them with the builder pattern. #[derive(Clone)] pub struct Coroutine { handle: Handle, stack_size: usize, leak_on_panic: bool, } impl Coroutine { /// Starts building a coroutine. /// /// This constructor produces a new builder for coroutines. The builder can then be used to /// specify configuration of the coroutines. /// /// It is possible to spawn multiple coroutines from the same builder. /// /// # Parameters /// /// * `handle`: The coroutines need a reactor core to run on and schedule their control /// switches. This is the handle to the reactor core to be used. /// /// # Examples /// /// ``` /// # extern crate corona; /// # extern crate futures; /// # extern crate tokio_core; /// use corona::Coroutine; /// use futures::stream; /// use tokio_core::reactor::Core; /// /// # fn main() { /// let core = Core::new().unwrap(); /// let builder = Coroutine::new(core.handle()); /// /// let coroutine = builder.spawn(|await| { }); /// # } /// /// ``` pub fn new(handle: Handle) -> Self { Coroutine { handle, stack_size: Stack::default_size(), leak_on_panic: false, } } /// Spawns a coroutine directly. /// /// This constructor spawns a coroutine with default parameters without the inconvenience of /// handling a builder. It is equivalent to spawning it with an unconfigured builder. /// /// Unlike the [`spawn`](#method.spawn.html), this one can't fail, since the default parameters /// of the builder are expected to always work (if they don't, file a bug). /// /// # Examples /// /// ``` /// # extern crate corona; /// # extern crate futures; /// # extern crate tokio_core; /// use corona::Coroutine; /// use futures::stream; /// use tokio_core::reactor::Core; /// /// # fn main() { /// let core = Core::new().unwrap(); /// /// let coroutine = Coroutine::with_defaults(core.handle(), |await| { }); /// # } /// /// ``` pub fn with_defaults<R, Task>(handle: Handle, task: Task) -> CoroutineResult<R> where R: 'static, Task: FnOnce(&Await) -> R + 'static, { Coroutine::new(handle).spawn(task) } fn spawn_inner<Task>(&self, task: Task) where Task: FnOnce(Handle, RefCell<Option<Context>>, RefCell<Option<ProtectedFixedSizeStack>>) -> (RefCell<Option<Context>>, RefCell<Option<ProtectedFixedSizeStack>>) + 'static { let handle = self.handle.clone(); let perform = move |context, stack| { let context = RefCell::new(Some(context)); let stack = RefCell::new(Some(stack)); let (context, stack) = task(handle, context, stack); (context.into_inner().unwrap(), stack.into_inner().unwrap()) }; Switch::run_new_coroutine(self.stack_size, Box::new(Some(perform))); } /// Spawns a coroutine. /// /// Spawns the given closure as a coroutine with the parameters configured in the current /// builder. /// /// The closure is started right away and is run inside the call until it either yields the /// control or terminates. If it yields (for whatever reason, not only through the /// [`Await::yield_now`](struct.Await.html#method.yield_now) method), it'll get a chance to /// continue only through running the reactor core. /// /// # Parameters /// /// * `task`: The closure to run. /// /// # Panics /// /// * In case an invalid stack size has been configured. This is a panic and not an error for /// two reasons. It's very unlikely an application using coroutines could continue if it /// can't spawn them. Also, configuring invalid stack size is a programmer bug. /// /// # Result /// /// On successful call to this method, a `Future` representing the completion of the task is /// provided. The future resolves either to the result of the closure or an error if the /// closure panics. pub fn spawn<R, Task>(&self, task: Task) -> CoroutineResult<R> where R: 'static, Task: FnOnce(&Await) -> R + 'static, { let (sender, receiver) = oneshot::channel(); let leak = self.leak_on_panic; let perform_and_send = move |handle, context, stack| { { let await = Await { context: &context, dropped: Cell::new(false), leak_on_panic: leak, stack: &stack, handle: &handle, }; let result = match panic::catch_unwind(AssertUnwindSafe(move || task(&await))) { Ok(res) => TaskResult::Finished(res), Err(panic) => TaskResult::Panicked(panic), }; // We are not interested in errors. They just mean the receiver is no longer // interested, which is fine by us. drop(sender.send(result)); } (context, stack) }; self.spawn_inner(perform_and_send); CoroutineResult::new(receiver) } /// Spawns a generator. /// /// A generator is just like a coroutine (and this method is very similar to the /// [`spawn`](#method.spawn) method, so most of its notes apply). It can, however, produce a /// stream of items of a certain kind and has no direct return value. The return value is not a /// `Future`, but a `Stream` of the produced items. pub fn generator<Item, Task>(&self, task: Task) -> GeneratorResult<Item> where Item: 'static, Task: FnOnce(&Producer<Item>) + 'static, { let (sender, receiver) = mpsc::channel(1); let leak = self.leak_on_panic; let generate = move |handle, context, stack| { { let await = Await { context: &context, dropped: Cell::new(false), leak_on_panic: leak, stack: &stack, handle: &handle, }; let producer = Producer::new(&await, sender.clone()); match panic::catch_unwind(AssertUnwindSafe(move || task(&producer))) { Ok(_) => (), Err(panic) => drop(await.future(sender.send(Err(panic)))), } } (context, stack) }; self.spawn_inner(generate); GeneratorResult::new(receiver) } /// Configures a stack size for the coroutines. /// /// The method sets the stack size of the coroutines that'll be spawned from this builder. The /// default stack size is platform dependent, but usually something relatively small. It is /// fine for most uses that don't use recursion or big on-stack allocations. /// /// Also, using too many different stack sizes in the same thread is inefficient. The library /// caches and reuses stacks, but it can do so only with stacks of the same size. /// /// # Notes /// /// If the configured stack size is invalid, attempts to spawn coroutines will fail with a /// panic. However, it is platform dependent what is considered valid (multiples of 4096 /// usually work). pub fn stack_size(&mut self, size: usize) -> &mut Self { self.stack_size = size; self } /// Configures the leak on panic option. /// /// If the reactor `Core` is dropped, any outstanding coroutines are cleaned up by panicking /// from the function they block on (if it is not one of the `_cleanup` variants). However, if /// the `Core` is dropped during panick, panicking inside the coroutine would abort the /// program. /// /// This option allows skipping the cleanups. Instead of aborting the program, the resources on /// the coroutines' stacks are leaked. /// /// The `_cleanup` routines still return `Err(Dropped)` and allow for manual cleanup. pub fn leak_on_panic(&mut self, leak: bool) -> &mut Self { self.leak_on_panic = leak; self } } #[cfg(test)] mod tests { use std::sync::atomic::{AtomicBool, Ordering}; use std::rc::Rc; use std::time::Duration; use futures::unsync::mpsc; use tokio_core::reactor::{Core, Interval, Timeout}; use super::*; /// Test spawning and execution of tasks. #[test] fn spawn_some() { let mut core = Core::new().unwrap(); let s1 = Rc::new(AtomicBool::new(false)); let s2 = Rc::new(AtomicBool::new(false)); let s1c = s1.clone(); let s2c = s2.clone(); let handle = core.handle(); let mut builder = Coroutine::new(handle); builder.stack_size(40960); let builder_inner = builder.clone(); let result = builder.spawn(move |_| { let result = builder_inner.spawn(move |_| { s2c.store(true, Ordering::Relaxed); 42 }); s1c.store(true, Ordering::Relaxed); result }); // Both coroutines run to finish assert!(s1.load(Ordering::Relaxed), "The outer closure didn't run"); assert!(s2.load(Ordering::Relaxed), "The inner closure didn't run"); // The result gets propagated through. let extract = result.and_then(|r| r); assert_eq!(42, core.run(extract).unwrap()); } /// The panic doesn't kill the main thread, but is reported. #[test] fn panics() { let mut core = Core::new().unwrap(); let handle = core.handle(); match core.run(Coroutine::with_defaults(handle, |_| panic!("Test"))) { Err(TaskFailed::Panicked(_)) => (), _ => panic!("Panic not reported properly"), } let handle = core.handle(); assert_eq!(42, core.run(Coroutine::with_defaults(handle, |_| 42)).unwrap()); } /// Wait for a future to complete. #[test] fn future_wait() { let mut core = Core::new().unwrap(); let (sender, receiver) = oneshot::channel(); let all_done = Coroutine::with_defaults(core.handle(), move |await| { let msg = await.future(receiver).unwrap(); msg }); Coroutine::with_defaults(core.handle(), move |await| { let timeout = Timeout::new(Duration::from_millis(50), await.handle()).unwrap(); await.future(timeout).unwrap(); sender.send(42).unwrap(); }); assert_eq!(42, core.run(all_done).unwrap()); } /// Stream can be iterated asynchronously. #[test] fn stream_iter() { let mut core = Core::new().unwrap(); let stream = Interval::new(Duration::from_millis(10), &core.handle()) .unwrap() .take(3) .map(|_| 1); let done = Coroutine::with_defaults(core.handle(), move |await| { let mut sum = 0; for i in await.stream(stream) { sum += i.unwrap(); } sum }); assert_eq!(3, core.run(done).unwrap()); } /// A smoke test for yield_now() (that it gets resumed and doesn't crash) #[test] fn yield_now() { let mut core = Core::new().unwrap(); let done = Coroutine::with_defaults(core.handle(), |await| { await.yield_now(); await.yield_now(); }); core.run(done).unwrap(); } #[test] fn producer() { let mut core = Core::new().unwrap(); let (sender, receiver) = mpsc::channel(1); let done_sender = Coroutine::with_defaults(core.handle(), move |await| { let producer = Producer::new(await, sender); producer.produce(42); producer.produce(12); }); let done_receiver = Coroutine::with_defaults(core.handle(), |await| { let result = await.stream(receiver).map(Result::unwrap).collect::<Result<Vec<_>, _>>().unwrap(); assert_eq!(vec![42, 12], result); }); let done = Coroutine::with_defaults(core.handle(), move |await| { await.future(done_sender).unwrap(); await.future(done_receiver).unwrap(); }); core.run(done).unwrap(); } #[test] fn generator() { let mut core = Core::new().unwrap(); let builder = Coroutine::new(core.handle()); let stream1 = builder.generator(|await| { await.produce(42); await.produce(12); }); let stream2 = builder.generator(|await| { for item in await.stream(stream1) { await.produce(item.unwrap()); } }); let done = builder.spawn(move |await| { let mut result = Vec::new(); for item in await.stream(stream2) { result.push(item.unwrap()); } assert_eq!(vec![42, 12], result); }); core.run(done).unwrap(); } /* TODO: This thing deadlocks. Any chance of preventing it from compilation? #[test] fn blocks() { let mut core = Core::new().unwrap(); let (sender, receiver) = oneshot::channel(); let handle = core.handle(); let c = Coroutine::with_defaults(handle.clone(), move |_await| { core.run(receiver).unwrap(); }); Coroutine::with_defaults(handle, |await| { let timeout = Timeout::new(Duration::from_millis(50), await.handle()).unwrap(); await.future(timeout).unwrap(); drop(sender.send(42)); }); c.wait().unwrap(); } */ }