pub struct Client { /* private fields */ }
Expand description
Client for Feldera API
With Feldera, users create data pipelines out of SQL programs. A SQL program comprises tables and views, and includes as well the definition of input and output connectors for each respectively. A connector defines a data source or data sink to feed input data into tables or receive output data computed by the views respectively.
§Pipeline
The API is centered around the pipeline, which most importantly consists out of the SQL program, but also has accompanying metadata and configuration parameters (e.g., compilation profile, number of workers, etc.).
- A pipeline is identified and referred to by its user-provided unique name.
- The pipeline program is asynchronously compiled when the pipeline is first created or when its program is subsequently updated.
- Pipeline deployment is only possible once the program is successfully compiled.
- A pipeline cannot be updated while it is deployed.
§Concurrency
Each pipeline has a version, which is incremented each time its core fields are updated. The version is monotonically increasing. There is additionally a program version which covers only the program-related core fields, and is used by the compiler to discern when to recompile.
Version: 0.104.0
Implementations§
Source§impl Client
impl Client
Sourcepub fn new(baseurl: &str) -> Self
pub fn new(baseurl: &str) -> Self
Create a new client.
baseurl
is the base URL provided to the internal
reqwest::Client
, and should include a scheme and hostname,
as well as port and a path stem if applicable.
Sourcepub fn new_with_client(baseurl: &str, client: Client) -> Self
pub fn new_with_client(baseurl: &str, client: Client) -> Self
Construct a new client with an existing reqwest::Client
,
allowing more control over its configuration.
baseurl
is the base URL provided to the internal
reqwest::Client
, and should include a scheme and hostname,
as well as port and a path stem if applicable.
Sourcepub fn api_version(&self) -> &'static str
pub fn api_version(&self) -> &'static str
Get the version of this API.
This string is pulled directly from the source OpenAPI document and may be in any format the API selects.
Source§impl Client
impl Client
Sourcepub fn get_config_authentication(&self) -> GetConfigAuthentication<'_>
pub fn get_config_authentication(&self) -> GetConfigAuthentication<'_>
Retrieve authentication provider configuration
Sends a GET
request to /config/authentication
let response = client.get_config_authentication()
.send()
.await;
Sourcepub fn list_api_keys(&self) -> ListApiKeys<'_>
pub fn list_api_keys(&self) -> ListApiKeys<'_>
Retrieve the list of API keys
Sends a GET
request to /v0/api_keys
let response = client.list_api_keys()
.send()
.await;
Sourcepub fn post_api_key(&self) -> PostApiKey<'_>
pub fn post_api_key(&self) -> PostApiKey<'_>
Create a new API key
Sends a POST
request to /v0/api_keys
Arguments:
body
:
let response = client.post_api_key()
.body(body)
.send()
.await;
Sourcepub fn get_api_key(&self) -> GetApiKey<'_>
pub fn get_api_key(&self) -> GetApiKey<'_>
Retrieve an API key
Sends a GET
request to /v0/api_keys/{api_key_name}
Arguments:
api_key_name
: Unique API key name
let response = client.get_api_key()
.api_key_name(api_key_name)
.send()
.await;
Sourcepub fn delete_api_key(&self) -> DeleteApiKey<'_>
pub fn delete_api_key(&self) -> DeleteApiKey<'_>
Delete an API key
Sends a DELETE
request to /v0/api_keys/{api_key_name}
Arguments:
api_key_name
: Unique API key name
let response = client.delete_api_key()
.api_key_name(api_key_name)
.send()
.await;
Sourcepub fn get_config(&self) -> GetConfig<'_>
pub fn get_config(&self) -> GetConfig<'_>
Retrieve general configuration
Sends a GET
request to /v0/config
let response = client.get_config()
.send()
.await;
Sourcepub fn get_config_demos(&self) -> GetConfigDemos<'_>
pub fn get_config_demos(&self) -> GetConfigDemos<'_>
Retrieve the list of demos
Sends a GET
request to /v0/config/demos
let response = client.get_config_demos()
.send()
.await;
Sourcepub fn get_metrics(&self) -> GetMetrics<'_>
pub fn get_metrics(&self) -> GetMetrics<'_>
Retrieve the metrics of all running pipelines belonging to this tenant
The metrics are collected by making individual HTTP requests to /metrics
endpoint of each pipeline, of which only successful responses are included
in the returned list.
Sends a GET
request to /v0/metrics
let response = client.get_metrics()
.send()
.await;
Sourcepub fn list_pipelines(&self) -> ListPipelines<'_>
pub fn list_pipelines(&self) -> ListPipelines<'_>
Retrieve the list of pipelines
Configure which fields are included using the selector
query parameter.
Sends a GET
request to /v0/pipelines
Arguments:
selector
: Theselector
parameter limits which fields are returned for a pipeline. Limiting which fields is particularly handy for instance when frequently monitoring over low bandwidth connections while being only interested in pipeline status.
let response = client.list_pipelines()
.selector(selector)
.send()
.await;
Sourcepub fn post_pipeline(&self) -> PostPipeline<'_>
pub fn post_pipeline(&self) -> PostPipeline<'_>
Create a new pipeline
Sends a POST
request to /v0/pipelines
let response = client.post_pipeline()
.body(body)
.send()
.await;
Sourcepub fn get_pipeline(&self) -> GetPipeline<'_>
pub fn get_pipeline(&self) -> GetPipeline<'_>
Retrieve a pipeline
Configure which fields are included using the selector
query parameter.
Sends a GET
request to /v0/pipelines/{pipeline_name}
Arguments:
pipeline_name
: Unique pipeline nameselector
: Theselector
parameter limits which fields are returned for a pipeline. Limiting which fields is particularly handy for instance when frequently monitoring over low bandwidth connections while being only interested in pipeline status.
let response = client.get_pipeline()
.pipeline_name(pipeline_name)
.selector(selector)
.send()
.await;
Sourcepub fn put_pipeline(&self) -> PutPipeline<'_>
pub fn put_pipeline(&self) -> PutPipeline<'_>
Fully update a pipeline if it already exists, otherwise create a new pipeline
Sends a PUT
request to /v0/pipelines/{pipeline_name}
Arguments:
pipeline_name
: Unique pipeline namebody
let response = client.put_pipeline()
.pipeline_name(pipeline_name)
.body(body)
.send()
.await;
Sourcepub fn delete_pipeline(&self) -> DeletePipeline<'_>
pub fn delete_pipeline(&self) -> DeletePipeline<'_>
Delete a pipeline
Sends a DELETE
request to /v0/pipelines/{pipeline_name}
Arguments:
pipeline_name
: Unique pipeline name
let response = client.delete_pipeline()
.pipeline_name(pipeline_name)
.send()
.await;
Sourcepub fn patch_pipeline(&self) -> PatchPipeline<'_>
pub fn patch_pipeline(&self) -> PatchPipeline<'_>
Partially update a pipeline
Sends a PATCH
request to /v0/pipelines/{pipeline_name}
Arguments:
pipeline_name
: Unique pipeline namebody
let response = client.patch_pipeline()
.pipeline_name(pipeline_name)
.body(body)
.send()
.await;
Sourcepub fn checkpoint_pipeline(&self) -> CheckpointPipeline<'_>
pub fn checkpoint_pipeline(&self) -> CheckpointPipeline<'_>
Initiates checkpoint for a running or paused pipeline
Returns a checkpoint sequence number that can be used with /checkpoint_status
to
determine when the checkpoint has completed.
Sends a POST
request to /v0/pipelines/{pipeline_name}/checkpoint
Arguments:
pipeline_name
: Unique pipeline name
let response = client.checkpoint_pipeline()
.pipeline_name(pipeline_name)
.send()
.await;
Sourcepub fn get_checkpoint_status(&self) -> GetCheckpointStatus<'_>
pub fn get_checkpoint_status(&self) -> GetCheckpointStatus<'_>
Retrieve status of checkpoint activity in a pipeline
Sends a GET
request to /v0/pipelines/{pipeline_name}/checkpoint_status
Arguments:
pipeline_name
: Unique pipeline name
let response = client.get_checkpoint_status()
.pipeline_name(pipeline_name)
.send()
.await;
Sourcepub fn get_pipeline_circuit_profile(&self) -> GetPipelineCircuitProfile<'_>
pub fn get_pipeline_circuit_profile(&self) -> GetPipelineCircuitProfile<'_>
Retrieve the circuit performance profile of a running or paused pipeline
Sends a GET
request to /v0/pipelines/{pipeline_name}/circuit_profile
Arguments:
pipeline_name
: Unique pipeline name
let response = client.get_pipeline_circuit_profile()
.pipeline_name(pipeline_name)
.send()
.await;
Sourcepub fn post_pipeline_clear(&self) -> PostPipelineClear<'_>
pub fn post_pipeline_clear(&self) -> PostPipelineClear<'_>
Clears the pipeline storage asynchronously
IMPORTANT: Clearing means disassociating the storage from the pipeline. Depending on the storage type this can include its deletion.
It sets the storage state to Clearing
, after which the clearing process is
performed asynchronously. Progress should be monitored by polling the pipeline
using the GET
endpoints. An /clear
cannot be cancelled.
Sends a POST
request to /v0/pipelines/{pipeline_name}/clear
Arguments:
pipeline_name
: Unique pipeline name
let response = client.post_pipeline_clear()
.pipeline_name(pipeline_name)
.send()
.await;
Sourcepub fn completion_status(&self) -> CompletionStatus<'_>
pub fn completion_status(&self) -> CompletionStatus<'_>
Check the status of a completion token returned by the /ingress
or /completion_token
endpoint.
Sends a GET
request to /v0/pipelines/{pipeline_name}/completion_status
Arguments:
pipeline_name
: Unique pipeline nametoken
: Completion token returned by the ‘/ingress’ or ‘/completion_status’ endpoint.
let response = client.completion_status()
.pipeline_name(pipeline_name)
.token(token)
.send()
.await;
Sourcepub fn http_output(&self) -> HttpOutput<'_>
pub fn http_output(&self) -> HttpOutput<'_>
Subscribe to a stream of updates from a SQL view or table
The pipeline responds with a continuous stream of changes to the specified
table or view, encoded using the format specified in the ?format=
parameter. Updates are split into Chunk
s.
The pipeline continues sending updates until the client closes the connection or the pipeline is stopped.
Sends a POST
request to /v0/pipelines/{pipeline_name}/egress/{table_name}
Arguments:
pipeline_name
: Unique pipeline nametable_name
: SQL table name. Unquoted SQL names have to be capitalized. Quoted SQL names have to exactly match the case from the SQL program.array
: Set totrue
to group updates in this stream into JSON arrays (used in conjunction withformat=json
). The default value isfalse
backpressure
: Apply backpressure on the pipeline when the HTTP client cannot receive data fast enough. When this flag is set to false (the default), the HTTP connector drops data chunks if the client is not keeping up with its output. This prevents a slow HTTP client from slowing down the entire pipeline. When the flag is set to true, the connector waits for the client to receive each chunk and blocks the pipeline if the client cannot keep up.format
: Output data format, e.g., ‘csv’ or ‘json’.
let response = client.http_output()
.pipeline_name(pipeline_name)
.table_name(table_name)
.array(array)
.backpressure(backpressure)
.format(format)
.send()
.await;
Sourcepub fn get_pipeline_heap_profile(&self) -> GetPipelineHeapProfile<'_>
pub fn get_pipeline_heap_profile(&self) -> GetPipelineHeapProfile<'_>
Retrieve the heap profile of a running or paused pipeline
Sends a GET
request to /v0/pipelines/{pipeline_name}/heap_profile
Arguments:
pipeline_name
: Unique pipeline name
let response = client.get_pipeline_heap_profile()
.pipeline_name(pipeline_name)
.send()
.await;
Sourcepub fn http_input(&self) -> HttpInput<'_>
pub fn http_input(&self) -> HttpInput<'_>
Push data to a SQL table
The client sends data encoded using the format specified in the ?format=
parameter as a body of the request. The contents of the data must match
the SQL table schema specified in table_name
The pipeline ingests data as it arrives without waiting for the end of the request. Successful HTTP response indicates that all data has been ingested successfully.
On success, returns a completion token that can be passed to the ‘/completion_status’ endpoint to check whether the pipeline has fully processed the data.
Sends a POST
request to /v0/pipelines/{pipeline_name}/ingress/{table_name}
Arguments:
pipeline_name
: Unique pipeline nametable_name
: SQL table name. Unquoted SQL names have to be capitalized. Quoted SQL names have to exactly match the case from the SQL program.array
: Set totrue
if updates in this stream are packaged into JSON arrays (used in conjunction withformat=json
). The default values isfalse
.force
: Whentrue
, push data to the pipeline even if the pipeline is paused. The default value isfalse
format
: Input data format, e.g., ‘csv’ or ‘json’.update_format
: JSON data change event format (used in conjunction withformat=json
). The default value is ‘insert_delete’.body
: Input data in the specified format
let response = client.http_input()
.pipeline_name(pipeline_name)
.table_name(table_name)
.array(array)
.force(force)
.format(format)
.update_format(update_format)
.body(body)
.send()
.await;
Sourcepub fn get_pipeline_logs(&self) -> GetPipelineLogs<'_>
pub fn get_pipeline_logs(&self) -> GetPipelineLogs<'_>
Retrieve logs of a pipeline as a stream
The logs stream catches up to the extent of the internally configured per-pipeline circular logs buffer (limited to a certain byte size and number of lines, whichever is reached first). After the catch-up, new lines are pushed whenever they become available.
It is possible for the logs stream to end prematurely due to the API server temporarily losing connection to the runner. In this case, it is needed to issue again a new request to this endpoint.
The logs stream will end when the pipeline is deleted, or if the runner restarts. Note that in both cases the logs will be cleared.
Sends a GET
request to /v0/pipelines/{pipeline_name}/logs
Arguments:
pipeline_name
: Unique pipeline name
let response = client.get_pipeline_logs()
.pipeline_name(pipeline_name)
.send()
.await;
Sourcepub fn get_pipeline_metrics(&self) -> GetPipelineMetrics<'_>
pub fn get_pipeline_metrics(&self) -> GetPipelineMetrics<'_>
Retrieve circuit metrics of a running or paused pipeline
Sends a GET
request to /v0/pipelines/{pipeline_name}/metrics
Arguments:
pipeline_name
: Unique pipeline nameformat
let response = client.get_pipeline_metrics()
.pipeline_name(pipeline_name)
.format(format)
.send()
.await;
Sourcepub fn post_pipeline_pause(&self) -> PostPipelinePause<'_>
pub fn post_pipeline_pause(&self) -> PostPipelinePause<'_>
Pause the pipeline asynchronously by updating the desired state
The endpoint returns immediately after setting the desired state to Paused
.
The procedure to get to the desired state is performed asynchronously.
Progress should be monitored by polling the pipeline GET
endpoints.
Note the following:
- A stopped pipeline can be started through calling either
/start
or/pause
- Both starting as paused and pausing a pipeline is done by calling
/pause
- A pipeline which is in the process of suspending or stopping cannot be paused
Sends a POST
request to /v0/pipelines/{pipeline_name}/pause
Arguments:
pipeline_name
: Unique pipeline name
let response = client.post_pipeline_pause()
.pipeline_name(pipeline_name)
.send()
.await;
Sourcepub fn get_program_info(&self) -> GetProgramInfo<'_>
pub fn get_program_info(&self) -> GetProgramInfo<'_>
Retrieve the program info of a pipeline
Sends a GET
request to /v0/pipelines/{pipeline_name}/program_info
Arguments:
pipeline_name
: Unique pipeline name
let response = client.get_program_info()
.pipeline_name(pipeline_name)
.send()
.await;
Sourcepub fn pipeline_adhoc_sql(&self) -> PipelineAdhocSql<'_>
pub fn pipeline_adhoc_sql(&self) -> PipelineAdhocSql<'_>
Execute an ad-hoc SQL query in a running or paused pipeline
The evaluation is not incremental.
Sends a GET
request to /v0/pipelines/{pipeline_name}/query
Arguments:
pipeline_name
: Unique pipeline nameformat
: Input data format, e.g., ‘text’, ‘json’ or ‘parquet’sql
: SQL query to execute
let response = client.pipeline_adhoc_sql()
.pipeline_name(pipeline_name)
.format(format)
.sql(sql)
.send()
.await;
Sourcepub fn post_pipeline_start(&self) -> PostPipelineStart<'_>
pub fn post_pipeline_start(&self) -> PostPipelineStart<'_>
Start the pipeline asynchronously by updating the desired state
The endpoint returns immediately after setting the desired state to Running
.
The procedure to get to the desired state is performed asynchronously.
Progress should be monitored by polling the pipeline GET
endpoints.
Note the following:
- A stopped pipeline can be started through calling either
/start
or/pause
- Both starting as running and resuming a pipeline is done by calling
/start
- A pipeline which is in the process of suspending or stopping cannot be started
Sends a POST
request to /v0/pipelines/{pipeline_name}/start
Arguments:
pipeline_name
: Unique pipeline name
let response = client.post_pipeline_start()
.pipeline_name(pipeline_name)
.send()
.await;
Sourcepub fn get_pipeline_stats(&self) -> GetPipelineStats<'_>
pub fn get_pipeline_stats(&self) -> GetPipelineStats<'_>
Retrieve statistics (e.g., performance counters) of a running or paused pipeline
Sends a GET
request to /v0/pipelines/{pipeline_name}/stats
Arguments:
pipeline_name
: Unique pipeline name
let response = client.get_pipeline_stats()
.pipeline_name(pipeline_name)
.send()
.await;
Sourcepub fn post_pipeline_stop(&self) -> PostPipelineStop<'_>
pub fn post_pipeline_stop(&self) -> PostPipelineStop<'_>
Stop the pipeline asynchronously by updating the desired state
There are two variants:
/stop?force=false
(default): the pipeline will first atomically checkpoint before deprovisioning the compute resources. When resuming, the pipeline will start from this/stop?force=true
: the compute resources will be immediately deprovisioned. When resuming, it will pick up the latest checkpoint made by the periodic checkpointer or by a prior/checkpoint
call.
The endpoint returns immediately after setting the desired state to Suspended
for
?force=false
or Stopped
for ?force=true
. In the former case, once the pipeline has
successfully passes the Suspending
state, the desired state will become Stopped
as well.
The procedure to get to the desired state is performed asynchronously. Progress should be
monitored by polling the pipeline GET
endpoints.
Note the following:
- The suspending that is done with
/stop?force=false
is not guaranteed to succeed: - If an error is returned during the suspension, the pipeline will be forcefully stopped with that error set
- Otherwise, it will keep trying to suspend, in which case it is possible to cancel suspending
by calling
/stop?force=true
/stop?force=true
cannot be cancelled: the pipeline must first reachStopped
before another action can be done- A pipeline which is in the process of suspending or stopping can only be forcefully stopped
Sends a POST
request to /v0/pipelines/{pipeline_name}/stop
Arguments:
pipeline_name
: Unique pipeline nameforce
: Theforce
parameter determines whether to immediately deprovision the pipeline compute resources (force=true
) or first attempt to atomically checkpoint before doing so (force=false
, which is the default).
let response = client.post_pipeline_stop()
.pipeline_name(pipeline_name)
.force(force)
.send()
.await;
Sourcepub fn completion_token(&self) -> CompletionToken<'_>
pub fn completion_token(&self) -> CompletionToken<'_>
Generate a completion token for an input connector
Returns a token that can be passed to the /completion_status
endpoint
to check whether the pipeline has finished processing all inputs received from the
connector before the token was generated.
Sends a GET
request to /v0/pipelines/{pipeline_name}/tables/{table_name}/connectors/{connector_name}/completion_token
Arguments:
pipeline_name
: Unique pipeline nametable_name
: SQL table name. Unquoted SQL names have to be capitalized. Quoted SQL names have to exactly match the case from the SQL program.connector_name
: Unique input connector name
let response = client.completion_token()
.pipeline_name(pipeline_name)
.table_name(table_name)
.connector_name(connector_name)
.send()
.await;
Sourcepub fn get_pipeline_input_connector_status(
&self,
) -> GetPipelineInputConnectorStatus<'_>
pub fn get_pipeline_input_connector_status( &self, ) -> GetPipelineInputConnectorStatus<'_>
Retrieve the status of an input connector
Sends a GET
request to /v0/pipelines/{pipeline_name}/tables/{table_name}/connectors/{connector_name}/stats
Arguments:
pipeline_name
: Unique pipeline nametable_name
: Unique table nameconnector_name
: Unique input connector name
let response = client.get_pipeline_input_connector_status()
.pipeline_name(pipeline_name)
.table_name(table_name)
.connector_name(connector_name)
.send()
.await;
Sourcepub fn post_pipeline_input_connector_action(
&self,
) -> PostPipelineInputConnectorAction<'_>
pub fn post_pipeline_input_connector_action( &self, ) -> PostPipelineInputConnectorAction<'_>
Start (resume) or pause the input connector
The following values of the action
argument are accepted: start
and pause
.
Input connectors can be in either the Running
or Paused
state. By default,
connectors are initialized in the Running
state when a pipeline is deployed.
In this state, the connector actively fetches data from its configured data
source and forwards it to the pipeline. If needed, a connector can be created
in the Paused
state by setting its
paused
property
to true
. When paused, the connector remains idle until reactivated using the
start
command. Conversely, a connector in the Running
state can be paused
at any time by issuing the pause
command.
The current connector state can be retrieved via the
GET /v0/pipelines/{pipeline_name}/stats
endpoint.
Note that only if both the pipeline and the connector state is Running
,
is the input connector active.
Pipeline state Connector state Connector is active?
-------------- --------------- --------------------
Paused Paused No
Paused Running No
Running Paused No
Running Running Yes
Sends a POST
request to /v0/pipelines/{pipeline_name}/tables/{table_name}/connectors/{connector_name}/{action}
Arguments:
pipeline_name
: Unique pipeline nametable_name
: Unique table nameconnector_name
: Unique input connector nameaction
: Input connector action (one of: start, pause)
let response = client.post_pipeline_input_connector_action()
.pipeline_name(pipeline_name)
.table_name(table_name)
.connector_name(connector_name)
.action(action)
.send()
.await;
Sourcepub fn get_pipeline_output_connector_status(
&self,
) -> GetPipelineOutputConnectorStatus<'_>
pub fn get_pipeline_output_connector_status( &self, ) -> GetPipelineOutputConnectorStatus<'_>
Retrieve the status of an output connector
Sends a GET
request to /v0/pipelines/{pipeline_name}/views/{view_name}/connectors/{connector_name}/stats
Arguments:
pipeline_name
: Unique pipeline nameview_name
: Unique SQL view nameconnector_name
: Unique output connector name
let response = client.get_pipeline_output_connector_status()
.pipeline_name(pipeline_name)
.view_name(view_name)
.connector_name(connector_name)
.send()
.await;