arrow_flight/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A native Rust implementation of [Apache Arrow Flight](https://arrow.apache.org/docs/format/Flight.html)
19//! for exchanging [Arrow](https://arrow.apache.org) data between processes.
20//!
21//! Please see the [arrow-flight crates.io](https://crates.io/crates/arrow-flight)
22//! page for feature flags and more information.
23//!
24//! # Overview
25//!
26//! This crate contains:
27//!
28//! 1. Low level [prost] generated structs
29//!    for Flight gRPC protobuf messages, such as [`FlightData`], [`FlightInfo`],
30//!    [`Location`] and [`Ticket`].
31//!
32//! 2. Low level [tonic] generated [`flight_service_client`] and
33//!    [`flight_service_server`].
34//!
35//! 3. Experimental support for [Flight SQL] in [`sql`]. Requires the
36//!    `flight-sql-experimental` feature of this crate to be activated.
37//!
38//! [Flight SQL]: https://arrow.apache.org/docs/format/FlightSql.html
39#![allow(rustdoc::invalid_html_tags)]
40#![warn(missing_docs)]
41
42use arrow_ipc::{convert, writer, writer::EncodedData, writer::IpcWriteOptions};
43use arrow_schema::{ArrowError, Schema};
44
45use arrow_ipc::convert::try_schema_from_ipc_buffer;
46use base64::prelude::BASE64_STANDARD;
47use base64::Engine;
48use bytes::Bytes;
49use prost_types::Timestamp;
50use std::{fmt, ops::Deref};
51
52type ArrowResult<T> = std::result::Result<T, ArrowError>;
53
54#[allow(clippy::all)]
55mod gen {
56    // Since this file is auto-generated, we suppress all warnings
57    #![allow(missing_docs)]
58    include!("arrow.flight.protocol.rs");
59}
60
61/// Defines a `Flight` for generation or retrieval.
62pub mod flight_descriptor {
63    use super::gen;
64    pub use gen::flight_descriptor::DescriptorType;
65}
66
67/// Low Level [tonic] [`FlightServiceClient`](gen::flight_service_client::FlightServiceClient).
68pub mod flight_service_client {
69    use super::gen;
70    pub use gen::flight_service_client::FlightServiceClient;
71}
72
73/// Low Level [tonic] [`FlightServiceServer`](gen::flight_service_server::FlightServiceServer)
74/// and [`FlightService`](gen::flight_service_server::FlightService).
75pub mod flight_service_server {
76    use super::gen;
77    pub use gen::flight_service_server::FlightService;
78    pub use gen::flight_service_server::FlightServiceServer;
79}
80
81/// Mid Level [`FlightClient`]
82pub mod client;
83pub use client::FlightClient;
84
85/// Decoder to create [`RecordBatch`](arrow_array::RecordBatch) streams from [`FlightData`] streams.
86/// See [`FlightRecordBatchStream`](decode::FlightRecordBatchStream).
87pub mod decode;
88
89/// Encoder to create [`FlightData`] streams from [`RecordBatch`](arrow_array::RecordBatch) streams.
90/// See [`FlightDataEncoderBuilder`](encode::FlightDataEncoderBuilder).
91pub mod encode;
92
93/// Common error types
94pub mod error;
95
96pub use gen::Action;
97pub use gen::ActionType;
98pub use gen::BasicAuth;
99pub use gen::CancelFlightInfoRequest;
100pub use gen::CancelFlightInfoResult;
101pub use gen::CancelStatus;
102pub use gen::Criteria;
103pub use gen::Empty;
104pub use gen::FlightData;
105pub use gen::FlightDescriptor;
106pub use gen::FlightEndpoint;
107pub use gen::FlightInfo;
108pub use gen::HandshakeRequest;
109pub use gen::HandshakeResponse;
110pub use gen::Location;
111pub use gen::PollInfo;
112pub use gen::PutResult;
113pub use gen::RenewFlightEndpointRequest;
114pub use gen::Result;
115pub use gen::SchemaResult;
116pub use gen::Ticket;
117
118/// Helper to extract HTTP/gRPC trailers from a tonic stream.
119mod trailers;
120
121pub mod utils;
122
123#[cfg(feature = "flight-sql-experimental")]
124pub mod sql;
125mod streams;
126
127use flight_descriptor::DescriptorType;
128
129/// SchemaAsIpc represents a pairing of a `Schema` with IpcWriteOptions
130pub struct SchemaAsIpc<'a> {
131    /// Data type representing a schema and its IPC write options
132    pub pair: (&'a Schema, &'a IpcWriteOptions),
133}
134
135/// IpcMessage represents a `Schema` in the format expected in
136/// `FlightInfo.schema`
137#[derive(Debug)]
138pub struct IpcMessage(pub Bytes);
139
140// Useful conversion functions
141
142fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData {
143    let data_gen = writer::IpcDataGenerator::default();
144    let mut dict_tracker =
145        writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
146    data_gen.schema_to_bytes_with_dictionary_tracker(arrow_schema, &mut dict_tracker, options)
147}
148
149fn flight_schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> IpcMessage {
150    let encoded_data = flight_schema_as_encoded_data(schema, options);
151    IpcMessage(encoded_data.ipc_message.into())
152}
153
154// Implement a bunch of useful traits for various conversions, displays,
155// etc...
156
157// Deref
158
159impl Deref for IpcMessage {
160    type Target = [u8];
161
162    fn deref(&self) -> &Self::Target {
163        &self.0
164    }
165}
166
167impl<'a> Deref for SchemaAsIpc<'a> {
168    type Target = (&'a Schema, &'a IpcWriteOptions);
169
170    fn deref(&self) -> &Self::Target {
171        &self.pair
172    }
173}
174
175// Display...
176
177/// Limits the output of value to limit...
178fn limited_fmt(f: &mut fmt::Formatter<'_>, value: &[u8], limit: usize) -> fmt::Result {
179    if value.len() > limit {
180        write!(f, "{:?}", &value[..limit])
181    } else {
182        write!(f, "{:?}", &value)
183    }
184}
185
186impl fmt::Display for FlightData {
187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188        write!(f, "FlightData {{")?;
189        write!(f, " descriptor: ")?;
190        match &self.flight_descriptor {
191            Some(d) => write!(f, "{d}")?,
192            None => write!(f, "None")?,
193        };
194        write!(f, ", header: ")?;
195        limited_fmt(f, &self.data_header, 8)?;
196        write!(f, ", metadata: ")?;
197        limited_fmt(f, &self.app_metadata, 8)?;
198        write!(f, ", body: ")?;
199        limited_fmt(f, &self.data_body, 8)?;
200        write!(f, " }}")
201    }
202}
203
204impl fmt::Display for FlightDescriptor {
205    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206        write!(f, "FlightDescriptor {{")?;
207        write!(f, " type: ")?;
208        match self.r#type() {
209            DescriptorType::Cmd => {
210                write!(f, "cmd, value: ")?;
211                limited_fmt(f, &self.cmd, 8)?;
212            }
213            DescriptorType::Path => {
214                write!(f, "path: [")?;
215                let mut sep = "";
216                for element in &self.path {
217                    write!(f, "{sep}{element}")?;
218                    sep = ", ";
219                }
220                write!(f, "]")?;
221            }
222            DescriptorType::Unknown => {
223                write!(f, "unknown")?;
224            }
225        }
226        write!(f, " }}")
227    }
228}
229
230impl fmt::Display for FlightEndpoint {
231    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
232        write!(f, "FlightEndpoint {{")?;
233        write!(f, " ticket: ")?;
234        match &self.ticket {
235            Some(value) => write!(f, "{value}"),
236            None => write!(f, " None"),
237        }?;
238        write!(f, ", location: [")?;
239        let mut sep = "";
240        for location in &self.location {
241            write!(f, "{sep}{location}")?;
242            sep = ", ";
243        }
244        write!(f, "]")?;
245        write!(f, ", expiration_time:")?;
246        match &self.expiration_time {
247            Some(value) => write!(f, " {value}"),
248            None => write!(f, " None"),
249        }?;
250        write!(f, ", app_metadata: ")?;
251        limited_fmt(f, &self.app_metadata, 8)?;
252        write!(f, " }}")
253    }
254}
255
256impl fmt::Display for FlightInfo {
257    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258        let ipc_message = IpcMessage(self.schema.clone());
259        let schema: Schema = ipc_message.try_into().map_err(|_err| fmt::Error)?;
260        write!(f, "FlightInfo {{")?;
261        write!(f, " schema: {schema}")?;
262        write!(f, ", descriptor:")?;
263        match &self.flight_descriptor {
264            Some(d) => write!(f, " {d}"),
265            None => write!(f, " None"),
266        }?;
267        write!(f, ", endpoint: [")?;
268        let mut sep = "";
269        for endpoint in &self.endpoint {
270            write!(f, "{sep}{endpoint}")?;
271            sep = ", ";
272        }
273        write!(f, "], total_records: {}", self.total_records)?;
274        write!(f, ", total_bytes: {}", self.total_bytes)?;
275        write!(f, ", ordered: {}", self.ordered)?;
276        write!(f, ", app_metadata: ")?;
277        limited_fmt(f, &self.app_metadata, 8)?;
278        write!(f, " }}")
279    }
280}
281
282impl fmt::Display for PollInfo {
283    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
284        write!(f, "PollInfo {{")?;
285        write!(f, " info:")?;
286        match &self.info {
287            Some(value) => write!(f, " {value}"),
288            None => write!(f, " None"),
289        }?;
290        write!(f, ", descriptor:")?;
291        match &self.flight_descriptor {
292            Some(d) => write!(f, " {d}"),
293            None => write!(f, " None"),
294        }?;
295        write!(f, ", progress:")?;
296        match &self.progress {
297            Some(value) => write!(f, " {value}"),
298            None => write!(f, " None"),
299        }?;
300        write!(f, ", expiration_time:")?;
301        match &self.expiration_time {
302            Some(value) => write!(f, " {value}"),
303            None => write!(f, " None"),
304        }?;
305        write!(f, " }}")
306    }
307}
308
309impl fmt::Display for CancelFlightInfoRequest {
310    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311        write!(f, "CancelFlightInfoRequest {{")?;
312        write!(f, " info: ")?;
313        match &self.info {
314            Some(value) => write!(f, "{value}")?,
315            None => write!(f, "None")?,
316        };
317        write!(f, " }}")
318    }
319}
320
321impl fmt::Display for CancelFlightInfoResult {
322    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
323        write!(f, "CancelFlightInfoResult {{")?;
324        write!(f, " status: {}", self.status().as_str_name())?;
325        write!(f, " }}")
326    }
327}
328
329impl fmt::Display for RenewFlightEndpointRequest {
330    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
331        write!(f, "RenewFlightEndpointRequest {{")?;
332        write!(f, " endpoint: ")?;
333        match &self.endpoint {
334            Some(value) => write!(f, "{value}")?,
335            None => write!(f, "None")?,
336        };
337        write!(f, " }}")
338    }
339}
340
341impl fmt::Display for Location {
342    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343        write!(f, "Location {{")?;
344        write!(f, " uri: ")?;
345        write!(f, "{}", self.uri)
346    }
347}
348
349impl fmt::Display for Ticket {
350    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
351        write!(f, "Ticket {{")?;
352        write!(f, " ticket: ")?;
353        write!(f, "{}", BASE64_STANDARD.encode(&self.ticket))
354    }
355}
356
357// From...
358
359impl From<EncodedData> for FlightData {
360    fn from(data: EncodedData) -> Self {
361        FlightData {
362            data_header: data.ipc_message.into(),
363            data_body: data.arrow_data.into(),
364            ..Default::default()
365        }
366    }
367}
368
369impl From<SchemaAsIpc<'_>> for FlightData {
370    fn from(schema_ipc: SchemaAsIpc) -> Self {
371        let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1);
372        FlightData {
373            data_header: vals,
374            ..Default::default()
375        }
376    }
377}
378
379impl TryFrom<SchemaAsIpc<'_>> for SchemaResult {
380    type Error = ArrowError;
381
382    fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
383        // According to the definition from `Flight.proto`
384        // The schema of the dataset in its IPC form:
385        //   4 bytes - an optional IPC_CONTINUATION_TOKEN prefix
386        //   4 bytes - the byte length of the payload
387        //   a flatbuffer Message whose header is the Schema
388        let IpcMessage(vals) = schema_to_ipc_format(schema_ipc)?;
389        Ok(SchemaResult { schema: vals })
390    }
391}
392
393impl TryFrom<SchemaAsIpc<'_>> for IpcMessage {
394    type Error = ArrowError;
395
396    fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
397        schema_to_ipc_format(schema_ipc)
398    }
399}
400
401fn schema_to_ipc_format(schema_ipc: SchemaAsIpc) -> ArrowResult<IpcMessage> {
402    let pair = *schema_ipc;
403    let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1);
404
405    let mut schema = vec![];
406    writer::write_message(&mut schema, encoded_data, pair.1)?;
407    Ok(IpcMessage(schema.into()))
408}
409
410impl TryFrom<&FlightData> for Schema {
411    type Error = ArrowError;
412    fn try_from(data: &FlightData) -> ArrowResult<Self> {
413        convert::try_schema_from_flatbuffer_bytes(&data.data_header[..]).map_err(|err| {
414            ArrowError::ParseError(format!(
415                "Unable to convert flight data to Arrow schema: {err}"
416            ))
417        })
418    }
419}
420
421impl TryFrom<FlightInfo> for Schema {
422    type Error = ArrowError;
423
424    fn try_from(value: FlightInfo) -> ArrowResult<Self> {
425        value.try_decode_schema()
426    }
427}
428
429impl TryFrom<IpcMessage> for Schema {
430    type Error = ArrowError;
431
432    fn try_from(value: IpcMessage) -> ArrowResult<Self> {
433        try_schema_from_ipc_buffer(&value)
434    }
435}
436
437impl TryFrom<&SchemaResult> for Schema {
438    type Error = ArrowError;
439    fn try_from(data: &SchemaResult) -> ArrowResult<Self> {
440        try_schema_from_ipc_buffer(&data.schema)
441    }
442}
443
444impl TryFrom<SchemaResult> for Schema {
445    type Error = ArrowError;
446    fn try_from(data: SchemaResult) -> ArrowResult<Self> {
447        (&data).try_into()
448    }
449}
450
451// FlightData, FlightDescriptor, etc..
452
453impl FlightData {
454    /// Create a new [`FlightData`].
455    ///
456    /// # See Also
457    ///
458    /// See [`FlightDataEncoderBuilder`] for a higher level API to
459    /// convert a stream of [`RecordBatch`]es to [`FlightData`]s
460    ///
461    /// # Example:
462    ///
463    /// ```
464    /// # use bytes::Bytes;
465    /// # use arrow_flight::{FlightData, FlightDescriptor};
466    /// # fn encode_data() -> Bytes { Bytes::new() } // dummy data
467    /// // Get encoded Arrow IPC data:
468    /// let data_body: Bytes = encode_data();
469    /// // Create the FlightData message
470    /// let flight_data = FlightData::new()
471    ///   .with_descriptor(FlightDescriptor::new_cmd("the command"))
472    ///   .with_app_metadata("My apps metadata")
473    ///   .with_data_body(data_body);
474    /// ```
475    ///
476    /// [`FlightDataEncoderBuilder`]: crate::encode::FlightDataEncoderBuilder
477    /// [`RecordBatch`]: arrow_array::RecordBatch
478    pub fn new() -> Self {
479        Default::default()
480    }
481
482    /// Add a [`FlightDescriptor`] describing the data
483    pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
484        self.flight_descriptor = Some(flight_descriptor);
485        self
486    }
487
488    /// Add a data header
489    pub fn with_data_header(mut self, data_header: impl Into<Bytes>) -> Self {
490        self.data_header = data_header.into();
491        self
492    }
493
494    /// Add a data body. See [`IpcDataGenerator`] to create this data.
495    ///
496    /// [`IpcDataGenerator`]: arrow_ipc::writer::IpcDataGenerator
497    pub fn with_data_body(mut self, data_body: impl Into<Bytes>) -> Self {
498        self.data_body = data_body.into();
499        self
500    }
501
502    /// Add optional application specific metadata to the message
503    pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
504        self.app_metadata = app_metadata.into();
505        self
506    }
507}
508
509impl FlightDescriptor {
510    /// Create a new opaque command [`CMD`] `FlightDescriptor` to generate a dataset.
511    ///
512    /// [`CMD`]: https://github.com/apache/arrow/blob/6bd31f37ae66bd35594b077cb2f830be57e08acd/format/Flight.proto#L224-L227
513    pub fn new_cmd(cmd: impl Into<Bytes>) -> Self {
514        FlightDescriptor {
515            r#type: DescriptorType::Cmd.into(),
516            cmd: cmd.into(),
517            ..Default::default()
518        }
519    }
520
521    /// Create a new named path [`PATH`] `FlightDescriptor` that identifies a dataset
522    ///
523    /// [`PATH`]: https://github.com/apache/arrow/blob/6bd31f37ae66bd35594b077cb2f830be57e08acd/format/Flight.proto#L217-L222
524    pub fn new_path(path: Vec<String>) -> Self {
525        FlightDescriptor {
526            r#type: DescriptorType::Path.into(),
527            path,
528            ..Default::default()
529        }
530    }
531}
532
533impl FlightInfo {
534    /// Create a new, empty `FlightInfo`, describing where to fetch flight data
535    ///
536    ///
537    /// # Example:
538    /// ```
539    /// # use arrow_flight::{FlightInfo, Ticket, FlightDescriptor, FlightEndpoint};
540    /// # use arrow_schema::{Schema, Field, DataType};
541    /// # fn get_schema() -> Schema {
542    /// #   Schema::new(vec![
543    /// #     Field::new("a", DataType::Utf8, false),
544    /// #   ])
545    /// # }
546    /// #
547    /// // Create a new FlightInfo
548    /// let flight_info = FlightInfo::new()
549    ///   // Encode the Arrow schema
550    ///   .try_with_schema(&get_schema())
551    ///   .expect("encoding failed")
552    ///   .with_endpoint(
553    ///      FlightEndpoint::new()
554    ///        .with_ticket(Ticket::new("ticket contents")
555    ///      )
556    ///    )
557    ///   .with_descriptor(FlightDescriptor::new_cmd("RUN QUERY"));
558    /// ```
559    pub fn new() -> FlightInfo {
560        FlightInfo {
561            schema: Bytes::new(),
562            flight_descriptor: None,
563            endpoint: vec![],
564            ordered: false,
565            // Flight says "Set these to -1 if unknown."
566            //
567            // https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L287-L289
568            total_records: -1,
569            total_bytes: -1,
570            app_metadata: Bytes::new(),
571        }
572    }
573
574    /// Try and convert the data in this  `FlightInfo` into a [`Schema`]
575    pub fn try_decode_schema(self) -> ArrowResult<Schema> {
576        let msg = IpcMessage(self.schema);
577        msg.try_into()
578    }
579
580    /// Specify the schema for the response.
581    ///
582    /// Note this takes the arrow [`Schema`] (not the IPC schema) and
583    /// encodes it using the default IPC options.
584    ///
585    /// Returns an error if `schema` can not be encoded into IPC form.
586    pub fn try_with_schema(mut self, schema: &Schema) -> ArrowResult<Self> {
587        let options = IpcWriteOptions::default();
588        let IpcMessage(schema) = SchemaAsIpc::new(schema, &options).try_into()?;
589        self.schema = schema;
590        Ok(self)
591    }
592
593    /// Add specific a endpoint for fetching the data
594    pub fn with_endpoint(mut self, endpoint: FlightEndpoint) -> Self {
595        self.endpoint.push(endpoint);
596        self
597    }
598
599    /// Add a [`FlightDescriptor`] describing what this data is
600    pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
601        self.flight_descriptor = Some(flight_descriptor);
602        self
603    }
604
605    /// Set the number of records in the result, if known
606    pub fn with_total_records(mut self, total_records: i64) -> Self {
607        self.total_records = total_records;
608        self
609    }
610
611    /// Set the number of bytes in the result, if known
612    pub fn with_total_bytes(mut self, total_bytes: i64) -> Self {
613        self.total_bytes = total_bytes;
614        self
615    }
616
617    /// Specify if the response is [ordered] across endpoints
618    ///
619    /// [ordered]: https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L269-L275
620    pub fn with_ordered(mut self, ordered: bool) -> Self {
621        self.ordered = ordered;
622        self
623    }
624
625    /// Add optional application specific metadata to the message
626    pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
627        self.app_metadata = app_metadata.into();
628        self
629    }
630}
631
632impl PollInfo {
633    /// Create a new, empty [`PollInfo`], providing information for a long-running query
634    ///
635    /// # Example:
636    /// ```
637    /// # use arrow_flight::{FlightInfo, PollInfo, FlightDescriptor};
638    /// # use prost_types::Timestamp;
639    /// // Create a new PollInfo
640    /// let poll_info = PollInfo::new()
641    ///   .with_info(FlightInfo::new())
642    ///   .with_descriptor(FlightDescriptor::new_cmd("RUN QUERY"))
643    ///   .try_with_progress(0.5)
644    ///   .expect("progress should've been valid")
645    ///   .with_expiration_time(
646    ///     "1970-01-01".parse().expect("invalid timestamp")
647    ///   );
648    /// ```
649    pub fn new() -> Self {
650        Self {
651            info: None,
652            flight_descriptor: None,
653            progress: None,
654            expiration_time: None,
655        }
656    }
657
658    /// Add the current available results for the poll call as a [`FlightInfo`]
659    pub fn with_info(mut self, info: FlightInfo) -> Self {
660        self.info = Some(info);
661        self
662    }
663
664    /// Add a [`FlightDescriptor`] that the client should use for the next poll call,
665    /// if the query is not yet complete
666    pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
667        self.flight_descriptor = Some(flight_descriptor);
668        self
669    }
670
671    /// Set the query progress if known. Must be in the range [0.0, 1.0] else this will
672    /// return an error
673    pub fn try_with_progress(mut self, progress: f64) -> ArrowResult<Self> {
674        if !(0.0..=1.0).contains(&progress) {
675            return Err(ArrowError::InvalidArgumentError(format!(
676                "PollInfo progress must be in the range [0.0, 1.0], got {progress}"
677            )));
678        }
679        self.progress = Some(progress);
680        Ok(self)
681    }
682
683    /// Specify expiration time for this request
684    pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self {
685        self.expiration_time = Some(expiration_time);
686        self
687    }
688}
689
690impl<'a> SchemaAsIpc<'a> {
691    /// Create a new `SchemaAsIpc` from a `Schema` and `IpcWriteOptions`
692    pub fn new(schema: &'a Schema, options: &'a IpcWriteOptions) -> Self {
693        SchemaAsIpc {
694            pair: (schema, options),
695        }
696    }
697}
698
699impl CancelFlightInfoRequest {
700    /// Create a new [`CancelFlightInfoRequest`], providing the [`FlightInfo`]
701    /// of the query to cancel.
702    pub fn new(info: FlightInfo) -> Self {
703        Self { info: Some(info) }
704    }
705}
706
707impl CancelFlightInfoResult {
708    /// Create a new [`CancelFlightInfoResult`] from the provided [`CancelStatus`].
709    pub fn new(status: CancelStatus) -> Self {
710        Self {
711            status: status as i32,
712        }
713    }
714}
715
716impl RenewFlightEndpointRequest {
717    /// Create a new [`RenewFlightEndpointRequest`], providing the [`FlightEndpoint`]
718    /// for which is being requested an extension of its expiration.
719    pub fn new(endpoint: FlightEndpoint) -> Self {
720        Self {
721            endpoint: Some(endpoint),
722        }
723    }
724}
725
726impl Action {
727    /// Create a new Action with type and body
728    pub fn new(action_type: impl Into<String>, body: impl Into<Bytes>) -> Self {
729        Self {
730            r#type: action_type.into(),
731            body: body.into(),
732        }
733    }
734}
735
736impl Result {
737    /// Create a new Result with the specified body
738    pub fn new(body: impl Into<Bytes>) -> Self {
739        Self { body: body.into() }
740    }
741}
742
743impl Ticket {
744    /// Create a new `Ticket`
745    ///
746    /// # Example
747    ///
748    /// ```
749    /// # use arrow_flight::Ticket;
750    /// let ticket = Ticket::new("SELECT * from FOO");
751    /// ```
752    pub fn new(ticket: impl Into<Bytes>) -> Self {
753        Self {
754            ticket: ticket.into(),
755        }
756    }
757}
758
759impl FlightEndpoint {
760    /// Create a new, empty `FlightEndpoint` that represents a location
761    /// to retrieve Flight results.
762    ///
763    /// # Example
764    /// ```
765    /// # use arrow_flight::{FlightEndpoint, Ticket};
766    /// #
767    /// // Specify the client should fetch results from this server
768    /// let endpoint = FlightEndpoint::new()
769    ///   .with_ticket(Ticket::new("the ticket"));
770    ///
771    /// // Specify the client should fetch results from either
772    /// // `http://example.com` or `https://example.com`
773    /// let endpoint = FlightEndpoint::new()
774    ///   .with_ticket(Ticket::new("the ticket"))
775    ///   .with_location("http://example.com")
776    ///   .with_location("https://example.com");
777    /// ```
778    pub fn new() -> FlightEndpoint {
779        Default::default()
780    }
781
782    /// Set the [`Ticket`] used to retrieve data from the endpoint
783    pub fn with_ticket(mut self, ticket: Ticket) -> Self {
784        self.ticket = Some(ticket);
785        self
786    }
787
788    /// Add a location `uri` to this endpoint. Note each endpoint can
789    /// have multiple locations.
790    ///
791    /// If no `uri` is specified, the [Flight Spec] says:
792    ///
793    /// ```text
794    /// * If the list is empty, the expectation is that the ticket can only
795    /// * be redeemed on the current service where the ticket was
796    /// * generated.
797    /// ```
798    /// [Flight Spec]: https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L307C2-L312
799    pub fn with_location(mut self, uri: impl Into<String>) -> Self {
800        self.location.push(Location { uri: uri.into() });
801        self
802    }
803
804    /// Specify expiration time for this stream
805    pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self {
806        self.expiration_time = Some(expiration_time);
807        self
808    }
809
810    /// Add optional application specific metadata to the message
811    pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
812        self.app_metadata = app_metadata.into();
813        self
814    }
815}
816
817#[cfg(test)]
818mod tests {
819    use super::*;
820    use arrow_ipc::MetadataVersion;
821    use arrow_schema::{DataType, Field, TimeUnit};
822
823    struct TestVector(Vec<u8>, usize);
824
825    impl fmt::Display for TestVector {
826        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
827            limited_fmt(f, &self.0, self.1)
828        }
829    }
830
831    #[test]
832    fn it_creates_flight_descriptor_command() {
833        let expected_cmd = "my_command".as_bytes();
834        let fd = FlightDescriptor::new_cmd(expected_cmd.to_vec());
835        assert_eq!(fd.r#type(), DescriptorType::Cmd);
836        assert_eq!(fd.cmd, expected_cmd.to_vec());
837    }
838
839    #[test]
840    fn it_accepts_equal_output() {
841        let input = TestVector(vec![91; 10], 10);
842
843        let actual = format!("{input}");
844        let expected = format!("{:?}", vec![91; 10]);
845        assert_eq!(actual, expected);
846    }
847
848    #[test]
849    fn it_accepts_short_output() {
850        let input = TestVector(vec![91; 6], 10);
851
852        let actual = format!("{input}");
853        let expected = format!("{:?}", vec![91; 6]);
854        assert_eq!(actual, expected);
855    }
856
857    #[test]
858    fn it_accepts_long_output() {
859        let input = TestVector(vec![91; 10], 9);
860
861        let actual = format!("{input}");
862        let expected = format!("{:?}", vec![91; 9]);
863        assert_eq!(actual, expected);
864    }
865
866    #[test]
867    fn ser_deser_schema_result() {
868        let schema = Schema::new(vec![
869            Field::new("c1", DataType::Utf8, false),
870            Field::new("c2", DataType::Float64, true),
871            Field::new("c3", DataType::UInt32, false),
872            Field::new("c4", DataType::Boolean, true),
873            Field::new("c5", DataType::Timestamp(TimeUnit::Millisecond, None), true),
874            Field::new("c6", DataType::Time32(TimeUnit::Second), false),
875        ]);
876        // V5 with write_legacy_ipc_format = false
877        // this will write the continuation marker
878        let option = IpcWriteOptions::default();
879        let schema_ipc = SchemaAsIpc::new(&schema, &option);
880        let result: SchemaResult = schema_ipc.try_into().unwrap();
881        let des_schema: Schema = (&result).try_into().unwrap();
882        assert_eq!(schema, des_schema);
883
884        // V4 with write_legacy_ipc_format = true
885        // this will not write the continuation marker
886        let option = IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap();
887        let schema_ipc = SchemaAsIpc::new(&schema, &option);
888        let result: SchemaResult = schema_ipc.try_into().unwrap();
889        let des_schema: Schema = (&result).try_into().unwrap();
890        assert_eq!(schema, des_schema);
891    }
892}