Struct Client

Source
pub struct Client { /* private fields */ }
Expand description

Client for Feldera API

With Feldera, users create data pipelines out of SQL programs. A SQL program comprises tables and views, and includes as well the definition of input and output connectors for each respectively. A connector defines a data source or data sink to feed input data into tables or receive output data computed by the views respectively.

§Pipeline

The API is centered around the pipeline, which most importantly consists out of the SQL program, but also has accompanying metadata and configuration parameters (e.g., compilation profile, number of workers, etc.).

  • A pipeline is identified and referred to by its user-provided unique name.
  • The pipeline program is asynchronously compiled when the pipeline is first created or when its program is subsequently updated.
  • Pipeline deployment is only possible once the program is successfully compiled.
  • A pipeline cannot be updated while it is deployed.

§Concurrency

Each pipeline has a version, which is incremented each time its core fields are updated. The version is monotonically increasing. There is additionally a program version which covers only the program-related core fields, and is used by the compiler to discern when to recompile.

Version: 0.104.0

Implementations§

Source§

impl Client

Source

pub fn new(baseurl: &str) -> Self

Create a new client.

baseurl is the base URL provided to the internal reqwest::Client, and should include a scheme and hostname, as well as port and a path stem if applicable.

Source

pub fn new_with_client(baseurl: &str, client: Client) -> Self

Construct a new client with an existing reqwest::Client, allowing more control over its configuration.

baseurl is the base URL provided to the internal reqwest::Client, and should include a scheme and hostname, as well as port and a path stem if applicable.

Source

pub fn baseurl(&self) -> &String

Get the base URL to which requests are made.

Source

pub fn client(&self) -> &Client

Get the internal reqwest::Client used to make requests.

Source

pub fn api_version(&self) -> &'static str

Get the version of this API.

This string is pulled directly from the source OpenAPI document and may be in any format the API selects.

Source§

impl Client

Source

pub fn get_config_authentication(&self) -> GetConfigAuthentication<'_>

Retrieve authentication provider configuration

Sends a GET request to /config/authentication

let response = client.get_config_authentication()
    .send()
    .await;
Source

pub fn list_api_keys(&self) -> ListApiKeys<'_>

Retrieve the list of API keys

Sends a GET request to /v0/api_keys

let response = client.list_api_keys()
    .send()
    .await;
Source

pub fn post_api_key(&self) -> PostApiKey<'_>

Create a new API key

Sends a POST request to /v0/api_keys

Arguments:

  • body:
let response = client.post_api_key()
    .body(body)
    .send()
    .await;
Source

pub fn get_api_key(&self) -> GetApiKey<'_>

Retrieve an API key

Sends a GET request to /v0/api_keys/{api_key_name}

Arguments:

  • api_key_name: Unique API key name
let response = client.get_api_key()
    .api_key_name(api_key_name)
    .send()
    .await;
Source

pub fn delete_api_key(&self) -> DeleteApiKey<'_>

Delete an API key

Sends a DELETE request to /v0/api_keys/{api_key_name}

Arguments:

  • api_key_name: Unique API key name
let response = client.delete_api_key()
    .api_key_name(api_key_name)
    .send()
    .await;
Source

pub fn get_config(&self) -> GetConfig<'_>

Retrieve general configuration

Sends a GET request to /v0/config

let response = client.get_config()
    .send()
    .await;
Source

pub fn get_config_demos(&self) -> GetConfigDemos<'_>

Retrieve the list of demos

Sends a GET request to /v0/config/demos

let response = client.get_config_demos()
    .send()
    .await;
Source

pub fn get_metrics(&self) -> GetMetrics<'_>

Retrieve the metrics of all running pipelines belonging to this tenant

The metrics are collected by making individual HTTP requests to /metrics endpoint of each pipeline, of which only successful responses are included in the returned list.

Sends a GET request to /v0/metrics

let response = client.get_metrics()
    .send()
    .await;
Source

pub fn list_pipelines(&self) -> ListPipelines<'_>

Retrieve the list of pipelines

Configure which fields are included using the selector query parameter.

Sends a GET request to /v0/pipelines

Arguments:

  • selector: The selector parameter limits which fields are returned for a pipeline. Limiting which fields is particularly handy for instance when frequently monitoring over low bandwidth connections while being only interested in pipeline status.
let response = client.list_pipelines()
    .selector(selector)
    .send()
    .await;
Source

pub fn post_pipeline(&self) -> PostPipeline<'_>

Create a new pipeline

Sends a POST request to /v0/pipelines

let response = client.post_pipeline()
    .body(body)
    .send()
    .await;
Source

pub fn get_pipeline(&self) -> GetPipeline<'_>

Retrieve a pipeline

Configure which fields are included using the selector query parameter.

Sends a GET request to /v0/pipelines/{pipeline_name}

Arguments:

  • pipeline_name: Unique pipeline name
  • selector: The selector parameter limits which fields are returned for a pipeline. Limiting which fields is particularly handy for instance when frequently monitoring over low bandwidth connections while being only interested in pipeline status.
let response = client.get_pipeline()
    .pipeline_name(pipeline_name)
    .selector(selector)
    .send()
    .await;
Source

pub fn put_pipeline(&self) -> PutPipeline<'_>

Fully update a pipeline if it already exists, otherwise create a new pipeline

Sends a PUT request to /v0/pipelines/{pipeline_name}

Arguments:

  • pipeline_name: Unique pipeline name
  • body
let response = client.put_pipeline()
    .pipeline_name(pipeline_name)
    .body(body)
    .send()
    .await;
Source

pub fn delete_pipeline(&self) -> DeletePipeline<'_>

Delete a pipeline

Sends a DELETE request to /v0/pipelines/{pipeline_name}

Arguments:

  • pipeline_name: Unique pipeline name
let response = client.delete_pipeline()
    .pipeline_name(pipeline_name)
    .send()
    .await;
Source

pub fn patch_pipeline(&self) -> PatchPipeline<'_>

Partially update a pipeline

Sends a PATCH request to /v0/pipelines/{pipeline_name}

Arguments:

  • pipeline_name: Unique pipeline name
  • body
let response = client.patch_pipeline()
    .pipeline_name(pipeline_name)
    .body(body)
    .send()
    .await;
Source

pub fn checkpoint_pipeline(&self) -> CheckpointPipeline<'_>

Initiates checkpoint for a running or paused pipeline

Returns a checkpoint sequence number that can be used with /checkpoint_status to determine when the checkpoint has completed.

Sends a POST request to /v0/pipelines/{pipeline_name}/checkpoint

Arguments:

  • pipeline_name: Unique pipeline name
let response = client.checkpoint_pipeline()
    .pipeline_name(pipeline_name)
    .send()
    .await;
Source

pub fn get_checkpoint_status(&self) -> GetCheckpointStatus<'_>

Retrieve status of checkpoint activity in a pipeline

Sends a GET request to /v0/pipelines/{pipeline_name}/checkpoint_status

Arguments:

  • pipeline_name: Unique pipeline name
let response = client.get_checkpoint_status()
    .pipeline_name(pipeline_name)
    .send()
    .await;
Source

pub fn get_pipeline_circuit_profile(&self) -> GetPipelineCircuitProfile<'_>

Retrieve the circuit performance profile of a running or paused pipeline

Sends a GET request to /v0/pipelines/{pipeline_name}/circuit_profile

Arguments:

  • pipeline_name: Unique pipeline name
let response = client.get_pipeline_circuit_profile()
    .pipeline_name(pipeline_name)
    .send()
    .await;
Source

pub fn post_pipeline_clear(&self) -> PostPipelineClear<'_>

Clears the pipeline storage asynchronously

IMPORTANT: Clearing means disassociating the storage from the pipeline. Depending on the storage type this can include its deletion.

It sets the storage state to Clearing, after which the clearing process is performed asynchronously. Progress should be monitored by polling the pipeline using the GET endpoints. An /clear cannot be cancelled.

Sends a POST request to /v0/pipelines/{pipeline_name}/clear

Arguments:

  • pipeline_name: Unique pipeline name
let response = client.post_pipeline_clear()
    .pipeline_name(pipeline_name)
    .send()
    .await;
Source

pub fn completion_status(&self) -> CompletionStatus<'_>

Check the status of a completion token returned by the /ingress or /completion_token

endpoint.

Sends a GET request to /v0/pipelines/{pipeline_name}/completion_status

Arguments:

  • pipeline_name: Unique pipeline name
  • token: Completion token returned by the ‘/ingress’ or ‘/completion_status’ endpoint.
let response = client.completion_status()
    .pipeline_name(pipeline_name)
    .token(token)
    .send()
    .await;
Source

pub fn http_output(&self) -> HttpOutput<'_>

Subscribe to a stream of updates from a SQL view or table

The pipeline responds with a continuous stream of changes to the specified table or view, encoded using the format specified in the ?format= parameter. Updates are split into Chunks.

The pipeline continues sending updates until the client closes the connection or the pipeline is stopped.

Sends a POST request to /v0/pipelines/{pipeline_name}/egress/{table_name}

Arguments:

  • pipeline_name: Unique pipeline name
  • table_name: SQL table name. Unquoted SQL names have to be capitalized. Quoted SQL names have to exactly match the case from the SQL program.
  • array: Set to true to group updates in this stream into JSON arrays (used in conjunction with format=json). The default value is false
  • backpressure: Apply backpressure on the pipeline when the HTTP client cannot receive data fast enough. When this flag is set to false (the default), the HTTP connector drops data chunks if the client is not keeping up with its output. This prevents a slow HTTP client from slowing down the entire pipeline. When the flag is set to true, the connector waits for the client to receive each chunk and blocks the pipeline if the client cannot keep up.
  • format: Output data format, e.g., ‘csv’ or ‘json’.
let response = client.http_output()
    .pipeline_name(pipeline_name)
    .table_name(table_name)
    .array(array)
    .backpressure(backpressure)
    .format(format)
    .send()
    .await;
Source

pub fn get_pipeline_heap_profile(&self) -> GetPipelineHeapProfile<'_>

Retrieve the heap profile of a running or paused pipeline

Sends a GET request to /v0/pipelines/{pipeline_name}/heap_profile

Arguments:

  • pipeline_name: Unique pipeline name
let response = client.get_pipeline_heap_profile()
    .pipeline_name(pipeline_name)
    .send()
    .await;
Source

pub fn http_input(&self) -> HttpInput<'_>

Push data to a SQL table

The client sends data encoded using the format specified in the ?format= parameter as a body of the request. The contents of the data must match the SQL table schema specified in table_name

The pipeline ingests data as it arrives without waiting for the end of the request. Successful HTTP response indicates that all data has been ingested successfully.

On success, returns a completion token that can be passed to the ‘/completion_status’ endpoint to check whether the pipeline has fully processed the data.

Sends a POST request to /v0/pipelines/{pipeline_name}/ingress/{table_name}

Arguments:

  • pipeline_name: Unique pipeline name
  • table_name: SQL table name. Unquoted SQL names have to be capitalized. Quoted SQL names have to exactly match the case from the SQL program.
  • array: Set to true if updates in this stream are packaged into JSON arrays (used in conjunction with format=json). The default values is false.
  • force: When true, push data to the pipeline even if the pipeline is paused. The default value is false
  • format: Input data format, e.g., ‘csv’ or ‘json’.
  • update_format: JSON data change event format (used in conjunction with format=json). The default value is ‘insert_delete’.
  • body: Input data in the specified format
let response = client.http_input()
    .pipeline_name(pipeline_name)
    .table_name(table_name)
    .array(array)
    .force(force)
    .format(format)
    .update_format(update_format)
    .body(body)
    .send()
    .await;
Source

pub fn get_pipeline_logs(&self) -> GetPipelineLogs<'_>

Retrieve logs of a pipeline as a stream

The logs stream catches up to the extent of the internally configured per-pipeline circular logs buffer (limited to a certain byte size and number of lines, whichever is reached first). After the catch-up, new lines are pushed whenever they become available.

It is possible for the logs stream to end prematurely due to the API server temporarily losing connection to the runner. In this case, it is needed to issue again a new request to this endpoint.

The logs stream will end when the pipeline is deleted, or if the runner restarts. Note that in both cases the logs will be cleared.

Sends a GET request to /v0/pipelines/{pipeline_name}/logs

Arguments:

  • pipeline_name: Unique pipeline name
let response = client.get_pipeline_logs()
    .pipeline_name(pipeline_name)
    .send()
    .await;
Source

pub fn get_pipeline_metrics(&self) -> GetPipelineMetrics<'_>

Retrieve circuit metrics of a running or paused pipeline

Sends a GET request to /v0/pipelines/{pipeline_name}/metrics

Arguments:

  • pipeline_name: Unique pipeline name
  • format
let response = client.get_pipeline_metrics()
    .pipeline_name(pipeline_name)
    .format(format)
    .send()
    .await;
Source

pub fn post_pipeline_pause(&self) -> PostPipelinePause<'_>

Pause the pipeline asynchronously by updating the desired state

The endpoint returns immediately after setting the desired state to Paused. The procedure to get to the desired state is performed asynchronously. Progress should be monitored by polling the pipeline GET endpoints.

Note the following:

  • A stopped pipeline can be started through calling either /start or /pause
  • Both starting as paused and pausing a pipeline is done by calling /pause
  • A pipeline which is in the process of suspending or stopping cannot be paused

Sends a POST request to /v0/pipelines/{pipeline_name}/pause

Arguments:

  • pipeline_name: Unique pipeline name
let response = client.post_pipeline_pause()
    .pipeline_name(pipeline_name)
    .send()
    .await;
Source

pub fn get_program_info(&self) -> GetProgramInfo<'_>

Retrieve the program info of a pipeline

Sends a GET request to /v0/pipelines/{pipeline_name}/program_info

Arguments:

  • pipeline_name: Unique pipeline name
let response = client.get_program_info()
    .pipeline_name(pipeline_name)
    .send()
    .await;
Source

pub fn pipeline_adhoc_sql(&self) -> PipelineAdhocSql<'_>

Execute an ad-hoc SQL query in a running or paused pipeline

The evaluation is not incremental.

Sends a GET request to /v0/pipelines/{pipeline_name}/query

Arguments:

  • pipeline_name: Unique pipeline name
  • format: Input data format, e.g., ‘text’, ‘json’ or ‘parquet’
  • sql: SQL query to execute
let response = client.pipeline_adhoc_sql()
    .pipeline_name(pipeline_name)
    .format(format)
    .sql(sql)
    .send()
    .await;
Source

pub fn post_pipeline_start(&self) -> PostPipelineStart<'_>

Start the pipeline asynchronously by updating the desired state

The endpoint returns immediately after setting the desired state to Running. The procedure to get to the desired state is performed asynchronously. Progress should be monitored by polling the pipeline GET endpoints.

Note the following:

  • A stopped pipeline can be started through calling either /start or /pause
  • Both starting as running and resuming a pipeline is done by calling /start
  • A pipeline which is in the process of suspending or stopping cannot be started

Sends a POST request to /v0/pipelines/{pipeline_name}/start

Arguments:

  • pipeline_name: Unique pipeline name
let response = client.post_pipeline_start()
    .pipeline_name(pipeline_name)
    .send()
    .await;
Source

pub fn get_pipeline_stats(&self) -> GetPipelineStats<'_>

Retrieve statistics (e.g., performance counters) of a running or paused pipeline

Sends a GET request to /v0/pipelines/{pipeline_name}/stats

Arguments:

  • pipeline_name: Unique pipeline name
let response = client.get_pipeline_stats()
    .pipeline_name(pipeline_name)
    .send()
    .await;
Source

pub fn post_pipeline_stop(&self) -> PostPipelineStop<'_>

Stop the pipeline asynchronously by updating the desired state

There are two variants:

  • /stop?force=false (default): the pipeline will first atomically checkpoint before deprovisioning the compute resources. When resuming, the pipeline will start from this
  • /stop?force=true: the compute resources will be immediately deprovisioned. When resuming, it will pick up the latest checkpoint made by the periodic checkpointer or by a prior /checkpoint call.

The endpoint returns immediately after setting the desired state to Suspended for ?force=false or Stopped for ?force=true. In the former case, once the pipeline has successfully passes the Suspending state, the desired state will become Stopped as well. The procedure to get to the desired state is performed asynchronously. Progress should be monitored by polling the pipeline GET endpoints.

Note the following:

  • The suspending that is done with /stop?force=false is not guaranteed to succeed:
  • If an error is returned during the suspension, the pipeline will be forcefully stopped with that error set
  • Otherwise, it will keep trying to suspend, in which case it is possible to cancel suspending by calling /stop?force=true
  • /stop?force=true cannot be cancelled: the pipeline must first reach Stopped before another action can be done
  • A pipeline which is in the process of suspending or stopping can only be forcefully stopped

Sends a POST request to /v0/pipelines/{pipeline_name}/stop

Arguments:

  • pipeline_name: Unique pipeline name
  • force: The force parameter determines whether to immediately deprovision the pipeline compute resources (force=true) or first attempt to atomically checkpoint before doing so (force=false, which is the default).
let response = client.post_pipeline_stop()
    .pipeline_name(pipeline_name)
    .force(force)
    .send()
    .await;
Source

pub fn completion_token(&self) -> CompletionToken<'_>

Generate a completion token for an input connector

Returns a token that can be passed to the /completion_status endpoint to check whether the pipeline has finished processing all inputs received from the connector before the token was generated.

Sends a GET request to /v0/pipelines/{pipeline_name}/tables/{table_name}/connectors/{connector_name}/completion_token

Arguments:

  • pipeline_name: Unique pipeline name
  • table_name: SQL table name. Unquoted SQL names have to be capitalized. Quoted SQL names have to exactly match the case from the SQL program.
  • connector_name: Unique input connector name
let response = client.completion_token()
    .pipeline_name(pipeline_name)
    .table_name(table_name)
    .connector_name(connector_name)
    .send()
    .await;
Source

pub fn get_pipeline_input_connector_status( &self, ) -> GetPipelineInputConnectorStatus<'_>

Retrieve the status of an input connector

Sends a GET request to /v0/pipelines/{pipeline_name}/tables/{table_name}/connectors/{connector_name}/stats

Arguments:

  • pipeline_name: Unique pipeline name
  • table_name: Unique table name
  • connector_name: Unique input connector name
let response = client.get_pipeline_input_connector_status()
    .pipeline_name(pipeline_name)
    .table_name(table_name)
    .connector_name(connector_name)
    .send()
    .await;
Source

pub fn post_pipeline_input_connector_action( &self, ) -> PostPipelineInputConnectorAction<'_>

Start (resume) or pause the input connector

The following values of the action argument are accepted: start and pause.

Input connectors can be in either the Running or Paused state. By default, connectors are initialized in the Running state when a pipeline is deployed. In this state, the connector actively fetches data from its configured data source and forwards it to the pipeline. If needed, a connector can be created in the Paused state by setting its paused property to true. When paused, the connector remains idle until reactivated using the start command. Conversely, a connector in the Running state can be paused at any time by issuing the pause command.

The current connector state can be retrieved via the GET /v0/pipelines/{pipeline_name}/stats endpoint.

Note that only if both the pipeline and the connector state is Running, is the input connector active.

Pipeline state    Connector state    Connector is active?
--------------    ---------------    --------------------
Paused            Paused             No
Paused            Running            No
Running           Paused             No
Running           Running            Yes

Sends a POST request to /v0/pipelines/{pipeline_name}/tables/{table_name}/connectors/{connector_name}/{action}

Arguments:

  • pipeline_name: Unique pipeline name
  • table_name: Unique table name
  • connector_name: Unique input connector name
  • action: Input connector action (one of: start, pause)
let response = client.post_pipeline_input_connector_action()
    .pipeline_name(pipeline_name)
    .table_name(table_name)
    .connector_name(connector_name)
    .action(action)
    .send()
    .await;
Source

pub fn get_pipeline_output_connector_status( &self, ) -> GetPipelineOutputConnectorStatus<'_>

Retrieve the status of an output connector

Sends a GET request to /v0/pipelines/{pipeline_name}/views/{view_name}/connectors/{connector_name}/stats

Arguments:

  • pipeline_name: Unique pipeline name
  • view_name: Unique SQL view name
  • connector_name: Unique output connector name
let response = client.get_pipeline_output_connector_status()
    .pipeline_name(pipeline_name)
    .view_name(view_name)
    .connector_name(connector_name)
    .send()
    .await;

Trait Implementations§

Source§

impl Clone for Client

Source§

fn clone(&self) -> Client

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for Client

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl Freeze for Client

§

impl !RefUnwindSafe for Client

§

impl Send for Client

§

impl Sync for Client

§

impl Unpin for Client

§

impl !UnwindSafe for Client

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,