pipeworks 0.1.0

Robust data processing pipelines
use std::{thread, time::Duration};

use pipeworks_tasks::{
    task_spawn::{
        DropMessage, RestartStrategy, sleep_forever, spawn_fallible, spawn_infallible,
        spawn_supervised,
    },
    task_state::{EVENT_CHANNEL, TaskStateTrackingLayer},
};
use tokio::{sync::broadcast, time::sleep};
use tracing_subscriber::{layer::SubscriberExt as _, util::SubscriberInitExt};

#[tokio::main]
async fn main() {
    tracing_subscriber::registry()
        .with(TaskStateTrackingLayer)
        .init();

    let (tx, mut rx) = broadcast::channel(1024);

    // Let's listen in on task events for the whole tree (up to and including "root")
    tokio::spawn(async move {
        while let Ok(event) = rx.recv().await {
            println!("{}", event);
        }
    });

    // Spawn a "Supervised" task, which is an infallible task running an infinite loop that
    // repeatedly creates a fallible task to execute the given closure. When/if the closure returns
    // or panics, all children are killed, the supervisor waits for all children to be fully shut
    // down, then executes the Restart Strategy. That is to say: even when immediately rebooting,
    // the supervisor will not restart until all transitive child tasks are fully cleaned up and
    // destructors have been run. This is VERY useful for ensuring resource locks (like TCP
    // Sockets) have been fully closed before restarting the task without race conditions.
    EVENT_CHANNEL
        .scope(tx, async move {
            spawn_infallible("Root", async move {
                spawn_supervised(
                    "Restart-Immediate",
                    RestartStrategy::Immediate,
                    async || {
                        let guard = DropMessage::new();

                        // Spawn a child infallible task. If this tasks returns or panics, the parent task (and
                        // all it's dependents) will also be failed.
                        spawn_infallible("Infallible Never", async move {
                            let _slow_drop = SlowDrop;
                            let guard = DropMessage::new();
                            //sleep(Duration::MAX).await;
                            sleep_forever().await;
                            guard.will_return();
                        });

                        // This is the task that will cause the outer supervisor to reboot in this example, after
                        // 10ms it just returns (which is always considered a failure).
                        spawn_infallible("Infallible 10ms", async move {
                            let guard = DropMessage::new();
                            sleep(Duration::from_millis(10)).await;
                            guard.will_return();
                        });

                        // Spawns a fallible task. Another way to think of this is as an 'isolated infallible
                        // sub-tree'. That is, in fact, exactly how it's implemented. When the parent task (the
                        // outer supervisor) is failed this task will be killed like normal. But if a child of this
                        // task fails, the failure does not propagate up to the parent task. This is true of both
                        // returns and panics.
                        spawn_fallible("Fallable", async move {
                            let guard = DropMessage::new();

                            // This will cause the fallible task to fail.
                            spawn_infallible("Fallable->Infallable 5ms", async move {
                                let guard = DropMessage::new();
                                sleep(Duration::from_millis(5)).await;
                                guard.will_return();
                            });

                            // This will be killed when the above task returns after 5-ish ms.
                            spawn_infallible("Fallable->Infallable 9ms", async move {
                                let guard = DropMessage::new();
                                sleep(Duration::from_millis(9)).await;
                                guard.will_return();
                            });

                            // This waits until the current isolated sub-tree (in this case, the contents of our
                            // fallible task) are done running. It's a good way of saying "I don't have anything
                            // else to do, I've spawned children, but I don't want to return because that's always
                            // considered a failure".
                            sleep_forever().await;
                            guard.will_return();
                        });

                        // Spawn a child Supervised task. It's no different from the outer supervisor; it behaves
                        // just like a fallible task in this context, in that it will never fail the parent, but
                        // continually retries the child. Also like a fallible task, it too will be killed if the
                        // parent is killed (in this case, when the outer supervisor reboots).
                        spawn_supervised(
                            "Nested Supervisor",
                            RestartStrategy::SleepBetween(Duration::from_millis(1)),
                            async || {
                                let guard = DropMessage::new();
                                sleep(Duration::from_millis(2)).await;
                                guard.will_return();
                            },
                        );

                        sleep_forever().await;
                        guard.will_return();
                    },
                );

                sleep_forever().await;
            });
        })
        .await;

    sleep(Duration::from_millis(20)).await;
}

pub struct SlowDrop;

impl Drop for SlowDrop {
    fn drop(&mut self) {
        thread::sleep(Duration::from_millis(10));
    }
}