nodata 0.1.0

nodata is a kafka like message broker that is simple and easy to use, while relying on either local or s3 like data storage for consistency
use std::sync::Arc;

use anyhow::Context;
use dagger_sdk::{PortForward, ServiceUpOpts};
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;

use crate::grpc_component::GrpcComponentClient;

struct DaggerConn {
    client: dagger_sdk::Query,
    cancellation_token: CancellationToken,
}

impl DaggerConn {
    pub fn new(client: &dagger_sdk::Query) -> Self {
        Self {
            client: client.clone(),
            cancellation_token: CancellationToken::new(),
        }
    }

    pub async fn start_container(
        &self,
        name: &str,
        image: &str,
        cancellation_token: CancellationToken,
    ) -> anyhow::Result<DaggerContainer> {
        let client = self.client.clone();

        // Bind to the os, and let it select a random port above > 30000
        let component_listener = match TcpListener::bind("127.0.0.1:0").await {
            // FIXME: dagger doesn't support isize for some reason?
            Ok(listener) => listener,
            Err(e) => {
                tracing::warn!(error = e.to_string(), "failed to allocate port");
                anyhow::bail!(e);
            }
        };

        let port = component_listener
            .local_addr()
            .context("failed to find a valid random port, you may've run out")?
            .port();

        // free up reserved listener
        drop(component_listener);

        // Let the blocking container run in the background, maintained by the cancellation token handle in the dagger container
        let container_name = name.to_string();
        let container_token = cancellation_token.child_token();
        let container_image = image.to_string();

        tokio::spawn(async move {
            tokio::select! {
                _ = container_token.cancelled() => {},
                res = spawn_container(&client, &container_image, port) => {
                    if let Err(e) = res {
                        tracing::warn!(error=e.to_string(), "container {} failed", container_name);
                    }
                }
            }
        });

        let grpc = match GrpcComponentClient::new(format!("http://127.0.0.1:{}", port)).await {
            Ok(grpc) => grpc,
            Err(e) => {
                tracing::warn!(
                    error = e.to_string(),
                    port = port,
                    "failed to bootstrap grpc component, service may not be up yet."
                );

                anyhow::bail!(e);
            }
        };

        for i in 1..5 {
            match grpc.ping().await {
                Ok(_) => {
                    // TODO: Finally send something back to the caller
                    break;
                }
                Err(e) => {
                    tracing::warn!(
                        error = e.to_string(),
                        port = port,
                        "failed to ping grpc server, service may not be up yet."
                    );

                    tokio::time::sleep(std::time::Duration::from_secs(i)).await;
                }
            }
        }

        Ok(DaggerContainer {
            name: name.into(),
            image: image.into(),
            handle: cancellation_token,
            url: format!("127.0.0.1:{}", port),
        })
    }
}

pub struct DaggerEngine {
    cancellation: CancellationToken,

    dagger_conn: Arc<tokio::sync::Mutex<Option<DaggerConn>>>,
}

impl DaggerEngine {
    pub fn new() -> Self {
        Self {
            cancellation: CancellationToken::default(),
            dagger_conn: Arc::default(),
        }
    }

    pub async fn start(&mut self) -> anyhow::Result<()> {
        let cancellation = self.cancellation.child_token();
        let dagger_conn = self.dagger_conn.clone();

        tokio::spawn(async move {
            let mut dagger_conn_handle = dagger_conn.lock().await;

            if dagger_conn_handle.is_none() {
                let mut dagger_conn = dagger_client(cancellation.child_token()).await;

                if let Some(dagger_conn) = dagger_conn.recv().await {
                    *dagger_conn_handle = Some(dagger_conn);
                }
            }
        });

        Ok(())
    }

    pub async fn stop(&mut self) -> anyhow::Result<()> {
        self.cancellation.cancel();

        Ok(())
    }

    pub async fn start_container(
        &self,
        name: impl Into<String>,
        image: impl Into<String>,
    ) -> anyhow::Result<DaggerContainer> {
        let name = name.into();
        let image = image.into();

        for i in 0..5 {
            let channel = self.dagger_conn.lock().await;
            if channel.is_some() {
                // TODO: fill out

                match channel
                    .as_ref()
                    .unwrap()
                    .start_container(&name, &image, self.cancellation.child_token())
                    .await
                {
                    Ok(container) => return Ok(container),
                    Err(e) => {
                        tracing::info!(
                            container_name = name,
                            error = e.to_string(),
                            "failed to get container"
                        );
                    }
                }
            }

            tokio::time::sleep(std::time::Duration::from_secs(i)).await
        }

        anyhow::bail!("failed to find a valid channel, aborting")
    }
}

pub struct DaggerContainer {
    name: String,
    handle: CancellationToken,
    image: String,
    url: String,
}

impl DaggerContainer {
    pub async fn grpc_handle(&self) -> anyhow::Result<GrpcComponentClient> {
        let client = GrpcComponentClient::new(&self.url).await?;

        Ok(client)
    }
}

async fn dagger_client(cancellation: CancellationToken) -> tokio::sync::mpsc::Receiver<DaggerConn> {
    let (tx, rx) = tokio::sync::mpsc::channel::<DaggerConn>(1);

    tokio::spawn(async move {
        if let Err(e) = dagger_sdk::connect(|client| async move {
            tx.send(DaggerConn::new(&client)).await?;

            cancellation.cancelled().await;

            Ok(())
        })
        .await
        {
            tracing::warn!(
                error = e.to_string(),
                "failed to handle dagger connect, components may not be executed as they should "
            );
        }
    });

    rx
}

async fn spawn_container(
    client: &dagger_sdk::Query,
    image: &str,
    outer_port: u16,
) -> anyhow::Result<()> {
    tracing::debug!(
        image = image,
        outer_port = outer_port as isize,
        "spawning container"
    );

    let service = client
        .container()
        .from(image)
        .with_exposed_port(7900)
        .with_exec(vec!["nodata-transformer-test"])
        .as_service();

    service
        .up_opts(ServiceUpOpts {
            ports: Some(vec![PortForward {
                frontend: outer_port as isize,
                backend: 7900,
                protocol: dagger_sdk::NetworkProtocol::Tcp,
            }]),
            random: Some(false),
        })
        .await?;

    Ok(())
}

#[cfg(test)]
mod tests {
    use tracing_test::traced_test;

    use super::*;

    #[tokio::test]
    #[traced_test]
    async fn test_can_use_dagger_engine() -> anyhow::Result<()> {
        let mut dagger_engine = DaggerEngine::new();

        tracing::info!("starting dagger engine");
        dagger_engine.start().await?;

        tracing::info!("starting dagger container");
        let container = dagger_engine
            .start_container(
                "some_name",
                "docker.io/kasperhermansen/nodata-transformer-test:main-1723938433",
            )
            .await?;

        tracing::info!("getting grpc handle");
        let _ = container.grpc_handle().await?;

        Ok(())
    }
}