parquet_key_management/
lib.rs

1//! Encryption Key Management Tools for Parquet
2//!
3//! This library provides tools for integrating with a Key Management Server (KMS)
4//! to read and write encrypted Parquet files.
5//!
6//! Envelope encryption is used, where the Parquet file is encrypted with data encryption keys
7//! (DEKs) that are randomly generated per file,
8//! and the DEKs are encrypted with master encryption keys (MEKs) that are managed by a KMS.
9//! Double wrapping is used by default, where the DEKs are first encrypted with key encryption
10//! keys (KEKs) that are then encrypted with MEKs, to reduce KMS interactions.
11//!
12//! # Usage
13//! Using this module requires defining your own type that implements the
14//! [`KmsClient`](kms::KmsClient) trait and interacts with your organization's KMS.
15//!
16//! This `KmsClient` can then be used by the
17//! [`CryptoFactory`](crypto_factory::CryptoFactory) type to generate
18//! [`FileEncryptionProperties`](parquet::encryption::encrypt::FileEncryptionProperties)
19//! for writing encrypted Parquet files and
20//! [`FileDecryptionProperties`](parquet::encryption::decrypt::FileDecryptionProperties)
21//! for reading files.
22//!
23//! # `async` usage (`async` feature)
24//! This module also provides an [`AsyncKmsClient`](kms::AsyncKmsClient) trait that can be used
25//! with the [`async_reader`] and [`async_writer`] modules.
26//!
27//! See example on [`AsyncKmsClient`](kms::AsyncKmsClient).
28//!
29//! # Compatibility
30//! The encryption key metadata that is stored in the Parquet file is compatible with other Parquet
31//! implementations (PyArrow and parquet-java for example), so that files encrypted with this
32//! module may be decrypted by those implementations, and vice versa, as long as the
33//! `KmsClient` implementations are compatible.
34//!
35//! # Example of writing then reading an encrypted Parquet file
36//! ```
37//! use arrow_array::{ArrayRef, Float32Array, Int32Array, RecordBatch};
38//! use base64::prelude::BASE64_STANDARD;
39//! use base64::Engine;
40//! use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
41//! use parquet::arrow::ArrowWriter;
42//! use parquet_key_management::crypto_factory::{
43//!     CryptoFactory, DecryptionConfiguration, EncryptionConfigurationBuilder,
44//! };
45//! use parquet_key_management::kms::{KmsClient, KmsConnectionConfig};
46//! use parquet::errors::{ParquetError, Result};
47//! use parquet::file::properties::WriterProperties;
48//! use ring::aead::{Aad, LessSafeKey, UnboundKey, AES_128_GCM, NONCE_LEN};
49//! use ring::rand::{SecureRandom, SystemRandom};
50//! use std::collections::HashMap;
51//! use std::fs::File;
52//! use std::sync::Arc;
53//! use tempfile::TempDir;
54//!
55//! let temp_dir = TempDir::new()?;
56//! let file_path = temp_dir.path().join("encrypted_example.parquet");
57//!
58//! // Create a CryptoFactory, providing a factory function
59//! // that will create an example KMS client
60//! let crypto_factory = CryptoFactory::new(DemoKmsClient::create);
61//!
62//! // Specify any options required to connect to our KMS.
63//! // These are ignored by the DemoKmsClient but shown here for illustration.
64//! // The KMS instance ID and URL will be stored in the Parquet encryption metadata
65//! // so don't need to be specified if you are only reading files.
66//! let connection_config = Arc::new(
67//!     KmsConnectionConfig::builder()
68//!         .set_kms_instance_id("kms1".into())
69//!         .set_kms_instance_url("https://example.com/kms".into())
70//!         .set_key_access_token("secret_token".into())
71//!         .set_custom_kms_conf_option("custom_option".into(), "some_value".into())
72//!         .build(),
73//! );
74//!
75//! // Create an encryption configuration that will encrypt the footer with the "kf" key,
76//! // the "x" column with the "kc1" key, and the "y" column with the "kc2" key,
77//! // while leaving the "id" column unencrypted.
78//! let encryption_config = EncryptionConfigurationBuilder::new("kf".into())
79//!     .add_column_key("kc1".into(), vec!["x".into()])
80//!     .add_column_key("kc2".into(), vec!["y".into()])
81//!     .build()?;
82//!
83//! // Use the CryptoFactory to generate file encryption properties using the configuration
84//! let encryption_properties =
85//!     crypto_factory.file_encryption_properties(connection_config.clone(), &encryption_config)?;
86//! let writer_properties = WriterProperties::builder()
87//!     .with_file_encryption_properties(encryption_properties)
88//!     .build();
89//!
90//! // Write the encrypted Parquet file
91//! {
92//!     let file = File::create(&file_path)?;
93//!
94//!     let ids = Int32Array::from(vec![0, 1, 2, 3, 4, 5]);
95//!     let x_vals = Float32Array::from(vec![0.0, 0.1, 0.2, 0.3, 0.4, 0.5]);
96//!     let y_vals = Float32Array::from(vec![1.0, 1.1, 1.2, 1.3, 1.4, 1.5]);
97//!     let batch = RecordBatch::try_from_iter(vec![
98//!         ("id", Arc::new(ids) as ArrayRef),
99//!         ("x", Arc::new(x_vals) as ArrayRef),
100//!         ("y", Arc::new(y_vals) as ArrayRef),
101//!     ])?;
102//!
103//!     let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(writer_properties))?;
104//!
105//!     writer.write(&batch)?;
106//!     writer.close()?;
107//! }
108//!
109//! // Use the CryptoFactory to generate file decryption properties.
110//! // We don't need to specify which columns are encrypted and which keys are used,
111//! // that information is stored in the file metadata.
112//! let decryption_config = DecryptionConfiguration::default();
113//! let decryption_properties =
114//!     crypto_factory.file_decryption_properties(connection_config, decryption_config)?;
115//! let reader_options =
116//!     ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);
117//!
118//! // Read the file using the configured decryption properties
119//! let file = File::open(&file_path)?;
120//!
121//! let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, reader_options)?;
122//! let record_reader = builder.build()?;
123//! for batch in record_reader {
124//!     let batch = batch?;
125//!     println!("Read batch: {batch:?}");
126//! }
127//!
128//! /// Example KMS client that uses in-memory AES keys.
129//! /// A real KMS client should interact with a Key Management Server to encrypt and decrypt keys.
130//! pub struct DemoKmsClient {
131//!     key_map: HashMap<String, Vec<u8>>,
132//! }
133//!
134//! impl DemoKmsClient {
135//!     pub fn create(_config: &KmsConnectionConfig) -> Result<Arc<dyn KmsClient>> {
136//!         let mut key_map = HashMap::default();
137//!         key_map.insert("kf".into(), "0123456789012345".into());
138//!         key_map.insert("kc1".into(), "1234567890123450".into());
139//!         key_map.insert("kc2".into(), "1234567890123451".into());
140//!
141//!         Ok(Arc::new(Self { key_map }))
142//!     }
143//!
144//!     /// Get the AES key corresponding to a key identifier
145//!     fn get_key(&self, master_key_identifier: &str) -> Result<LessSafeKey> {
146//!         let key = self.key_map.get(master_key_identifier).ok_or_else(|| {
147//!             ParquetError::General(format!("Invalid master key '{master_key_identifier}'"))
148//!         })?;
149//!         let key = UnboundKey::new(&AES_128_GCM, key)
150//!             .map_err(|e| ParquetError::General(format!("Error creating AES key '{e}'")))?;
151//!         Ok(LessSafeKey::new(key))
152//!     }
153//! }
154//!
155//! impl KmsClient for DemoKmsClient {
156//!     /// Take a randomly generated key and encrypt it using the specified master key
157//!     fn wrap_key(&self, key_bytes: &[u8], master_key_identifier: &str) -> Result<String> {
158//!         let master_key = self.get_key(master_key_identifier)?;
159//!         let aad = master_key_identifier.as_bytes();
160//!         let rng = SystemRandom::new();
161//!
162//!         let mut nonce = [0u8; NONCE_LEN];
163//!         rng.fill(&mut nonce)?;
164//!         let nonce = ring::aead::Nonce::assume_unique_for_key(nonce);
165//!
166//!         let tag_len = master_key.algorithm().tag_len();
167//!         let mut ciphertext = Vec::with_capacity(NONCE_LEN + key_bytes.len() + tag_len);
168//!         ciphertext.extend_from_slice(nonce.as_ref());
169//!         ciphertext.extend_from_slice(key_bytes);
170//!         let tag = master_key.seal_in_place_separate_tag(
171//!             nonce,
172//!             Aad::from(aad),
173//!             &mut ciphertext[NONCE_LEN..],
174//!         )?;
175//!         ciphertext.extend_from_slice(tag.as_ref());
176//!         let encoded = BASE64_STANDARD.encode(&ciphertext);
177//!
178//!         Ok(encoded)
179//!     }
180//!
181//!     /// Take an encrypted key and decrypt it using the specified master key identifier
182//!     fn unwrap_key(&self, wrapped_key: &str, master_key_identifier: &str) -> Result<Vec<u8>> {
183//!         let wrapped_key = BASE64_STANDARD.decode(wrapped_key).map_err(|e| {
184//!             ParquetError::General(format!("Error base64 decoding wrapped key: {e}"))
185//!         })?;
186//!         let master_key = self.get_key(master_key_identifier)?;
187//!         let aad = master_key_identifier.as_bytes();
188//!         let nonce = ring::aead::Nonce::try_assume_unique_for_key(&wrapped_key[..NONCE_LEN])?;
189//!
190//!         let mut plaintext = Vec::with_capacity(wrapped_key.len() - NONCE_LEN);
191//!         plaintext.extend_from_slice(&wrapped_key[NONCE_LEN..]);
192//!
193//!         master_key.open_in_place(nonce, Aad::from(aad), &mut plaintext)?;
194//!         plaintext.resize(plaintext.len() - master_key.algorithm().tag_len(), 0u8);
195//!
196//!         Ok(plaintext)
197//!     }
198//! }
199//!
200//! # Ok::<(), parquet::errors::ParquetError>(())
201//! ```
202//!
203//! [`async_reader`]: https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/index.html
204//! [`async_writer`]: https://docs.rs/parquet/latest/parquet/arrow/async_writer/index.html
205
206#![cfg_attr(docsrs, feature(doc_cfg))]
207
208pub mod crypto_factory;
209#[cfg(feature = "datafusion")]
210pub mod datafusion;
211mod key_encryption;
212pub mod key_material;
213mod key_unwrapper;
214mod key_wrapper;
215pub mod kms;
216mod kms_manager;
217#[cfg(any(test, feature = "_test_utils"))]
218#[doc(hidden)]
219pub mod test_kms;