Struct arrow_flight::client::FlightClient
source · pub struct FlightClient { /* private fields */ }
Expand description
A “Mid level” Apache Arrow Flight client.
FlightClient
is intended as a convenience for interactions
with Arrow Flight servers. For more direct control, such as access
to the response headers, use FlightServiceClient
directly
via methods such as Self::inner
or Self::into_inner
.
Example:
use tonic::transport::Channel;
let channel = Channel::from_static("http://localhost:1234")
.connect()
.await
.expect("error connecting");
let mut client = FlightClient::new(channel);
// Send 'Hi' bytes as the handshake request to the server
let response = client
.handshake(Bytes::from("Hi"))
.await
.expect("error handshaking");
// Expect the server responded with 'Ho'
assert_eq!(response, Bytes::from("Ho"));
Implementations§
source§impl FlightClient
impl FlightClient
sourcepub fn new_from_inner(inner: FlightServiceClient<Channel>) -> Self
pub fn new_from_inner(inner: FlightServiceClient<Channel>) -> Self
Creates a new higher level client with the provided lower level client
sourcepub fn metadata(&self) -> &MetadataMap
pub fn metadata(&self) -> &MetadataMap
Return a reference to gRPC metadata included with each request
sourcepub fn metadata_mut(&mut self) -> &mut MetadataMap
pub fn metadata_mut(&mut self) -> &mut MetadataMap
Return a reference to gRPC metadata included with each request
These headers can be used, for example, to include authorization or other application specific headers.
sourcepub fn add_header(&mut self, key: &str, value: &str) -> Result<()>
pub fn add_header(&mut self, key: &str, value: &str) -> Result<()>
Add the specified header with value to all subsequent
requests. See Self::metadata_mut
for fine grained control.
sourcepub fn inner(&self) -> &FlightServiceClient<Channel>
pub fn inner(&self) -> &FlightServiceClient<Channel>
Return a reference to the underlying tonic
FlightServiceClient
sourcepub fn inner_mut(&mut self) -> &mut FlightServiceClient<Channel>
pub fn inner_mut(&mut self) -> &mut FlightServiceClient<Channel>
Return a mutable reference to the underlying tonic
FlightServiceClient
sourcepub fn into_inner(self) -> FlightServiceClient<Channel>
pub fn into_inner(self) -> FlightServiceClient<Channel>
Consume this client and return the underlying tonic
FlightServiceClient
sourcepub async fn handshake(&mut self, payload: impl Into<Bytes>) -> Result<Bytes>
pub async fn handshake(&mut self, payload: impl Into<Bytes>) -> Result<Bytes>
Perform an Arrow Flight handshake with the server, sending
payload
as the HandshakeRequest
payload and returning
the HandshakeResponse
bytes returned from the server
See FlightClient
docs for an example.
sourcepub async fn do_get(&mut self, ticket: Ticket) -> Result<FlightRecordBatchStream>
pub async fn do_get(&mut self, ticket: Ticket) -> Result<FlightRecordBatchStream>
Make a DoGet
call to the server with the provided ticket,
returning a FlightRecordBatchStream
for reading
RecordBatch
es.
Note
To access the returned FlightData
use
FlightRecordBatchStream::into_inner()
Example:
let mut client = FlightClient::new(channel);
// Invoke a do_get request on the server with a previously
// received Ticket
let response = client
.do_get(ticket)
.await
.expect("error invoking do_get");
// Use try_collect to get the RecordBatches from the server
let batches: Vec<RecordBatch> = response
.try_collect()
.await
.expect("no stream errors");
sourcepub async fn get_flight_info(
&mut self,
descriptor: FlightDescriptor
) -> Result<FlightInfo>
pub async fn get_flight_info(
&mut self,
descriptor: FlightDescriptor
) -> Result<FlightInfo>
Make a GetFlightInfo
call to the server with the provided
FlightDescriptor
and return the FlightInfo
from the
server. The FlightInfo
can be used with Self::do_get
to retrieve the requested batches.
Example:
let mut client = FlightClient::new(channel);
// Send a 'CMD' request to the server
let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
let flight_info = client
.get_flight_info(request)
.await
.expect("error handshaking");
// retrieve the first endpoint from the returned flight info
let ticket = flight_info
.endpoint[0]
// Extract the ticket
.ticket
.clone()
.expect("expected ticket");
// Retrieve the corresponding RecordBatch stream with do_get
let data = client
.do_get(ticket)
.await
.expect("error fetching data");
sourcepub async fn do_put<S: Stream<Item = FlightData> + Send + 'static>(
&mut self,
request: S
) -> Result<BoxStream<'static, Result<PutResult>>>
pub async fn do_put<S: Stream<Item = FlightData> + Send + 'static>(
&mut self,
request: S
) -> Result<BoxStream<'static, Result<PutResult>>>
Make a DoPut
call to the server with the provided
Stream
of FlightData
and returning a
stream of PutResult
.
Example:
let mut client = FlightClient::new(channel);
// encode the batch as a stream of `FlightData`
let flight_data_stream = FlightDataEncoderBuilder::new()
.build(futures::stream::iter(vec![Ok(batch)]))
// data encoder return Results, but do_put requires FlightData
.map(|batch|batch.unwrap());
// send the stream and get the results as `PutResult`
let response: Vec<PutResult>= client
.do_put(flight_data_stream)
.await
.unwrap()
.try_collect() // use TryStreamExt to collect stream
.await
.expect("error calling do_put");
sourcepub async fn do_exchange<S: Stream<Item = FlightData> + Send + 'static>(
&mut self,
request: S
) -> Result<FlightRecordBatchStream>
pub async fn do_exchange<S: Stream<Item = FlightData> + Send + 'static>(
&mut self,
request: S
) -> Result<FlightRecordBatchStream>
Make a DoExchange
call to the server with the provided
Stream
of FlightData
and returning a
stream of FlightData
.
Example:
let mut client = FlightClient::new(channel);
// encode the batch as a stream of `FlightData`
let flight_data_stream = FlightDataEncoderBuilder::new()
.build(futures::stream::iter(vec![Ok(batch)]))
// data encoder return Results, but do_exchange requires FlightData
.map(|batch|batch.unwrap());
// send the stream and get the results as `RecordBatches`
let response: Vec<RecordBatch> = client
.do_exchange(flight_data_stream)
.await
.unwrap()
.try_collect() // use TryStreamExt to collect stream
.await
.expect("error calling do_exchange");
sourcepub async fn list_flights(
&mut self,
expression: impl Into<Bytes>
) -> Result<BoxStream<'static, Result<FlightInfo>>>
pub async fn list_flights(
&mut self,
expression: impl Into<Bytes>
) -> Result<BoxStream<'static, Result<FlightInfo>>>
Make a ListFlights
call to the server with the provided
critera and returning a Stream
of FlightInfo
.
Example:
let mut client = FlightClient::new(channel);
// Send 'Name=Foo' bytes as the "expression" to the server
// and gather the returned FlightInfo
let responses: Vec<FlightInfo> = client
.list_flights(Bytes::from("Name=Foo"))
.await
.expect("error listing flights")
.try_collect() // use TryStreamExt to collect stream
.await
.expect("error gathering flights");
sourcepub async fn get_schema(
&mut self,
flight_descriptor: FlightDescriptor
) -> Result<Schema>
pub async fn get_schema(
&mut self,
flight_descriptor: FlightDescriptor
) -> Result<Schema>
Make a GetSchema
call to the server with the provided
FlightDescriptor
and returning the associated Schema
.
Example:
let mut client = FlightClient::new(channel);
// Request the schema result of a 'CMD' request to the server
let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
let schema: Schema = client
.get_schema(request)
.await
.expect("error making request");
sourcepub async fn list_actions(
&mut self
) -> Result<BoxStream<'static, Result<ActionType>>>
pub async fn list_actions(
&mut self
) -> Result<BoxStream<'static, Result<ActionType>>>
Make a ListActions
call to the server and returning a
Stream
of ActionType
.
Example:
let mut client = FlightClient::new(channel);
// List available actions on the server:
let actions: Vec<ActionType> = client
.list_actions()
.await
.expect("error listing actions")
.try_collect() // use TryStreamExt to collect stream
.await
.expect("error gathering actions");
sourcepub async fn do_action(
&mut self,
action: Action
) -> Result<BoxStream<'static, Result<Bytes>>>
pub async fn do_action(
&mut self,
action: Action
) -> Result<BoxStream<'static, Result<Bytes>>>
Make a DoAction
call to the server and returning a
Stream
of opaque Bytes
.
Example:
let mut client = FlightClient::new(channel);
let request = Action::new("my_action", "the body");
// Make a request to run the action on the server
let results: Vec<Bytes> = client
.do_action(request)
.await
.expect("error executing acton")
.try_collect() // use TryStreamExt to collect stream
.await
.expect("error gathering action results");
Trait Implementations§
Auto Trait Implementations§
impl !RefUnwindSafe for FlightClient
impl Send for FlightClient
impl Sync for FlightClient
impl Unpin for FlightClient
impl !UnwindSafe for FlightClient
Blanket Implementations§
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request