use std::sync::Arc;
use anyhow::Context;
use dagger_sdk::{PortForward, ServiceUpOpts};
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;
use crate::grpc_component::GrpcComponentClient;
struct DaggerConn {
client: dagger_sdk::Query,
cancellation_token: CancellationToken,
}
impl DaggerConn {
pub fn new(client: &dagger_sdk::Query) -> Self {
Self {
client: client.clone(),
cancellation_token: CancellationToken::new(),
}
}
pub async fn start_container(
&self,
name: &str,
image: &str,
cancellation_token: CancellationToken,
) -> anyhow::Result<DaggerContainer> {
let client = self.client.clone();
let component_listener = match TcpListener::bind("127.0.0.1:0").await {
Ok(listener) => listener,
Err(e) => {
tracing::warn!(error = e.to_string(), "failed to allocate port");
anyhow::bail!(e);
}
};
let port = component_listener
.local_addr()
.context("failed to find a valid random port, you may've run out")?
.port();
drop(component_listener);
let container_name = name.to_string();
let container_token = cancellation_token.child_token();
let container_image = image.to_string();
tokio::spawn(async move {
tokio::select! {
_ = container_token.cancelled() => {},
res = spawn_container(&client, &container_image, port) => {
if let Err(e) = res {
tracing::warn!(error=e.to_string(), "container {} failed", container_name);
}
}
}
});
let grpc = match GrpcComponentClient::new(format!("http://127.0.0.1:{}", port)).await {
Ok(grpc) => grpc,
Err(e) => {
tracing::warn!(
error = e.to_string(),
port = port,
"failed to bootstrap grpc component, service may not be up yet."
);
anyhow::bail!(e);
}
};
for i in 1..5 {
match grpc.ping().await {
Ok(_) => {
break;
}
Err(e) => {
tracing::warn!(
error = e.to_string(),
port = port,
"failed to ping grpc server, service may not be up yet."
);
tokio::time::sleep(std::time::Duration::from_secs(i)).await;
}
}
}
Ok(DaggerContainer {
name: name.into(),
image: image.into(),
handle: cancellation_token,
url: format!("127.0.0.1:{}", port),
})
}
}
pub struct DaggerEngine {
cancellation: CancellationToken,
dagger_conn: Arc<tokio::sync::Mutex<Option<DaggerConn>>>,
}
impl DaggerEngine {
pub fn new() -> Self {
Self {
cancellation: CancellationToken::default(),
dagger_conn: Arc::default(),
}
}
pub async fn start(&mut self) -> anyhow::Result<()> {
let cancellation = self.cancellation.child_token();
let dagger_conn = self.dagger_conn.clone();
tokio::spawn(async move {
let mut dagger_conn_handle = dagger_conn.lock().await;
if dagger_conn_handle.is_none() {
let mut dagger_conn = dagger_client(cancellation.child_token()).await;
if let Some(dagger_conn) = dagger_conn.recv().await {
*dagger_conn_handle = Some(dagger_conn);
}
}
});
Ok(())
}
pub async fn stop(&mut self) -> anyhow::Result<()> {
self.cancellation.cancel();
Ok(())
}
pub async fn start_container(
&self,
name: impl Into<String>,
image: impl Into<String>,
) -> anyhow::Result<DaggerContainer> {
let name = name.into();
let image = image.into();
for i in 0..5 {
let channel = self.dagger_conn.lock().await;
if channel.is_some() {
match channel
.as_ref()
.unwrap()
.start_container(&name, &image, self.cancellation.child_token())
.await
{
Ok(container) => return Ok(container),
Err(e) => {
tracing::info!(
container_name = name,
error = e.to_string(),
"failed to get container"
);
}
}
}
tokio::time::sleep(std::time::Duration::from_secs(i)).await
}
anyhow::bail!("failed to find a valid channel, aborting")
}
}
pub struct DaggerContainer {
name: String,
handle: CancellationToken,
image: String,
url: String,
}
impl DaggerContainer {
pub async fn grpc_handle(&self) -> anyhow::Result<GrpcComponentClient> {
let client = GrpcComponentClient::new(&self.url).await?;
Ok(client)
}
}
async fn dagger_client(cancellation: CancellationToken) -> tokio::sync::mpsc::Receiver<DaggerConn> {
let (tx, rx) = tokio::sync::mpsc::channel::<DaggerConn>(1);
tokio::spawn(async move {
if let Err(e) = dagger_sdk::connect(|client| async move {
tx.send(DaggerConn::new(&client)).await?;
cancellation.cancelled().await;
Ok(())
})
.await
{
tracing::warn!(
error = e.to_string(),
"failed to handle dagger connect, components may not be executed as they should "
);
}
});
rx
}
async fn spawn_container(
client: &dagger_sdk::Query,
image: &str,
outer_port: u16,
) -> anyhow::Result<()> {
tracing::debug!(
image = image,
outer_port = outer_port as isize,
"spawning container"
);
let service = client
.container()
.from(image)
.with_exposed_port(7900)
.with_exec(vec!["nodata-transformer-test"])
.as_service();
service
.up_opts(ServiceUpOpts {
ports: Some(vec![PortForward {
frontend: outer_port as isize,
backend: 7900,
protocol: dagger_sdk::NetworkProtocol::Tcp,
}]),
random: Some(false),
})
.await?;
Ok(())
}
#[cfg(test)]
mod tests {
use tracing_test::traced_test;
use super::*;
#[tokio::test]
#[traced_test]
async fn test_can_use_dagger_engine() -> anyhow::Result<()> {
let mut dagger_engine = DaggerEngine::new();
tracing::info!("starting dagger engine");
dagger_engine.start().await?;
tracing::info!("starting dagger container");
let container = dagger_engine
.start_container(
"some_name",
"docker.io/kasperhermansen/nodata-transformer-test:main-1723938433",
)
.await?;
tracing::info!("getting grpc handle");
let _ = container.grpc_handle().await?;
Ok(())
}
}