Skip to main content

ArmadaClient

Struct ArmadaClient 

Source
pub struct ArmadaClient { /* private fields */ }
Expand description

Armada gRPC client providing job submission and event-stream watching.

§Construction

Use ArmadaClient::connect for plaintext (dev / in-cluster) connections, ArmadaClient::connect_tls for production clusters using system root certificates, or ArmadaClient::connect_tls_with_config when you need a custom CA, domain override, or mutual TLS. All constructors accept any TokenProvider — pass crate::StaticTokenProvider for a static bearer token or supply your own implementation for dynamic auth.

// Plaintext
let client = ArmadaClient::connect("http://localhost:50051", StaticTokenProvider::new("tok"))
    .await?;

// TLS (uses system root certificates)
let client = ArmadaClient::connect_tls("https://armada.example.com:443", StaticTokenProvider::new("tok"))
    .await?;

// TLS with a custom CA
let pem = std::fs::read("ca.pem")?;
let client = ArmadaClient::connect_tls_with_config(
    "https://armada.example.com:443",
    ClientTlsConfig::new().ca_certificate(Certificate::from_pem(pem)),
    StaticTokenProvider::new("tok"),
).await?;

§Cloning

ArmadaClient is Clone. All clones share the same underlying channel and connection pool — cloning is O(1) and is the correct way to distribute the client across tasks:

let client = ArmadaClient::connect("http://localhost:50051", StaticTokenProvider::new("tok"))
    .await?;

let c1 = client.clone();
let c2 = client.clone();
tokio::spawn(async move { /* use c1 */ });
tokio::spawn(async move { /* use c2 */ });

§Timeouts

Apply a per-call deadline with ArmadaClient::with_timeout. When set, every RPC is governed by the deadline for its entire duration. For unary calls (submit) the deadline covers the round-trip. For streaming calls (watch) it covers the full lifetime of the stream — if the timeout elapses while events are still being received the stream is cancelled with a DEADLINE_EXCEEDED status.

Implementations§

Source§

impl ArmadaClient

Source

pub async fn connect( endpoint: impl Into<String>, token_provider: impl TokenProvider + 'static, ) -> Result<Self, Error>

Connect to an Armada server at endpoint using plaintext (no TLS).

endpoint must be a valid URI, e.g. "http://localhost:50051".

§Errors

Returns Error::InvalidUri if the URI is malformed, or Error::Transport if the connection cannot be established.

Source

pub async fn connect_tls( endpoint: impl Into<String>, token_provider: impl TokenProvider + 'static, ) -> Result<Self, Error>

Connect to an Armada server at endpoint using TLS.

Uses the system’s native root certificates to verify the server certificate. endpoint should use the https:// scheme, e.g. "https://armada.example.com:443".

For clusters with a private or self-signed CA, use ArmadaClient::connect_tls_with_config instead.

§Errors

Returns Error::InvalidUri if the URI is malformed, or Error::Transport if TLS configuration or the connection fails.

Source

pub async fn connect_tls_with_config( endpoint: impl Into<String>, tls_config: ClientTlsConfig, token_provider: impl TokenProvider + 'static, ) -> Result<Self, Error>

Connect to an Armada server at endpoint using a caller-supplied TLS config.

Use this when you need to supply a custom CA certificate (e.g. a private or self-signed CA), override the server domain name, or configure mutual TLS. Build the config with tonic::transport::ClientTlsConfig, accessible via armada_client::tonic::transport::ClientTlsConfig — no direct tonic dependency needed.

§Example — custom CA
use armada_client::{ArmadaClient, StaticTokenProvider};
use armada_client::tonic::transport::{Certificate, ClientTlsConfig};

let pem = std::fs::read("ca.pem")?;
let tls = ClientTlsConfig::new().ca_certificate(Certificate::from_pem(pem));
let client = ArmadaClient::connect_tls_with_config(
    "https://armada.example.com:443",
    tls,
    StaticTokenProvider::new("tok"),
).await?;
§Errors

Returns Error::InvalidUri if the URI is malformed, or Error::Transport if TLS configuration or the connection fails.

Source

pub fn with_timeout(self, timeout: Duration) -> Self

Set a default timeout applied to every RPC call.

When the timeout elapses the call fails with Error::Grpc wrapping a DEADLINE_EXCEEDED status. For streaming calls like ArmadaClient::watch, the deadline covers the entire stream duration — if it elapses while events are still arriving the stream is cancelled immediately.

Returns self so the call can be chained directly after construction:

let client = ArmadaClient::connect("http://localhost:50051", StaticTokenProvider::new("tok"))
    .await?
    .with_timeout(Duration::from_secs(30));
Source

pub async fn submit( &self, request: JobSubmitRequest, ) -> Result<JobSubmitResponse, Error>

Submit a batch of jobs to Armada.

Attaches an authorization header on every call using the configured TokenProvider (e.g. Bearer <token> or Basic <credentials>). Multiple job items can be included in a single request — they are all submitted atomically to the same queue and job set.

§Example
use armada_client::{
    ArmadaClient, JobRequestItemBuilder, JobSubmitRequest, StaticTokenProvider,
};
use armada_client::k8s::io::api::core::v1::PodSpec;

let item = JobRequestItemBuilder::new()
    .namespace("default")
    .pod_spec(PodSpec { containers: vec![], ..Default::default() })
    .build();

let response = client
    .submit(JobSubmitRequest {
        queue: "my-queue".into(),
        job_set_id: "my-job-set".into(),
        job_request_items: vec![item],
    })
    .await?;

for r in &response.job_response_items {
    if r.error.is_empty() {
        println!("submitted: {}", r.job_id);
    } else {
        eprintln!("rejected: {}", r.error);
    }
}
§Errors
Source

pub async fn cancel_jobs( &self, request: JobCancelRequest, ) -> Result<CancellationResult, Error>

Cancel one or more jobs.

§Arguments
  • request.queue — Armada queue name.
  • request.job_set_id — Job set the jobs belong to.
  • request.job_id — Single job ID to cancel (legacy, optional).
  • request.job_ids — Multiple job IDs to cancel in one call.
  • request.reason — Human-readable cancellation reason (optional).
§Example
use armada_client::{ArmadaClient, JobCancelRequest, StaticTokenProvider};

let result = client
    .cancel_jobs(JobCancelRequest {
        queue: "my-queue".into(),
        job_set_id: "my-job-set".into(),
        job_ids: vec!["01abc".into(), "01def".into()],
        reason: "no longer needed".into(),
        ..Default::default()
    })
    .await?;

println!("cancelled: {:?}", result.cancelled_ids);
§Errors
Source

pub async fn cancel_job_set( &self, request: JobSetCancelRequest, ) -> Result<(), Error>

Cancel all (or a filtered subset of) jobs in a job set.

§Arguments
  • request.queue — Armada queue name.
  • request.job_set_id — Job set to cancel.
  • request.filter — Optional crate::JobSetFilter limiting cancellation to jobs in specific states (e.g. only Queued and Running). Pass None to cancel all non-terminal jobs.
  • request.reason — Human-readable cancellation reason (optional).
§Example
use armada_client::{ArmadaClient, JobSetCancelRequest, JobSetFilter, JobState, StaticTokenProvider};

client
    .cancel_job_set(JobSetCancelRequest {
        queue: "my-queue".into(),
        job_set_id: "my-job-set".into(),
        filter: Some(JobSetFilter {
            states: vec![JobState::Queued as i32, JobState::Running as i32],
        }),
        reason: "aborting experiment".into(),
    })
    .await?;
§Errors
Source

pub async fn watch( &self, queue: impl Into<String>, job_set_id: impl Into<String>, from_message_id: Option<String>, ) -> Result<BoxStream<'static, Result<EventStreamMessage, Error>>, Error>

Watch a job set, returning a stream of events.

Opens a server-streaming gRPC call and returns a BoxStream that yields EventStreamMessage values as the server pushes them. The stream ends when the server closes the connection.

Reconnection is the caller’s responsibility. Store the last message_id you received and pass it back as from_message_id when reconnecting to avoid replaying events you have already processed.

§Arguments
  • queue — Armada queue name.
  • job_set_id — Job set to watch.
  • from_message_id — Optional resume cursor. Pass Some(id) to receive only events that occurred after id; pass None to receive all events from the beginning.
§Example
use futures::StreamExt;
use armada_client::{ArmadaClient, StaticTokenProvider};

let mut stream = client
    .watch("my-queue", "my-job-set", None)
    .await?;

let mut last_id = String::new();
while let Some(result) = stream.next().await {
    match result {
        Ok(msg) => {
            last_id = msg.id.clone();
            println!("event id={} message={:?}", msg.id, msg.message);
        }
        Err(e) => {
            eprintln!("stream error: {e}");
            break; // reconnect using `from_message_id: Some(last_id)`
        }
    }
}
§Errors
  • Error::Auth if the token provider fails.
  • Error::InvalidMetadata if the token contains invalid header characters.
  • Error::Grpc if the server returns a non-OK status on the initial call. The stream will not error simply because the job set does not exist yet — it will wait for events, which avoids races when watch is called immediately after submit.
  • Individual stream items may also be [Err(Error::Grpc)] if the server sends a trailing error status.

Trait Implementations§

Source§

impl Clone for ArmadaClient

Source§

fn clone(&self) -> ArmadaClient

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FromRef<T> for T
where T: Clone,

Source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<L> LayerExt<L> for L

Source§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in Layered.
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more