use std::collections::HashMap;
use std::sync::Arc;
use bytes::Bytes;
use chrono::Utc;
use conflict_checker::ConflictChecker;
use delta_kernel::table_properties::TableProperties;
use futures::future::BoxFuture;
use object_store::Error as ObjectStoreError;
use object_store::ObjectStoreExt as _;
use object_store::path::Path;
use serde_json::Value;
use tracing::*;
use uuid::Uuid;
use delta_kernel::table_features::TableFeature;
use serde::{Deserialize, Serialize};
use self::conflict_checker::{TransactionInfo, WinningCommitSummary};
use crate::errors::DeltaTableError;
use crate::kernel::{Action, CommitInfo, EagerSnapshot, Metadata, Protocol, Transaction, Version};
use crate::logstore::ObjectStoreRef;
use crate::logstore::{CommitOrBytes, LogStoreRef};
use crate::operations::CustomExecuteHandler;
use crate::protocol::DeltaOperation;
use crate::protocol::{cleanup_expired_logs_for, create_checkpoint_for};
use crate::table::config::TablePropertiesExt as _;
use crate::table::state::DeltaTableState;
use crate::{DeltaResult, crate_version};
pub use self::conflict_checker::CommitConflictError;
pub use self::protocol::INSTANCE as PROTOCOL;
#[cfg(test)]
pub(crate) mod application;
mod conflict_checker;
mod protocol;
#[cfg(feature = "datafusion")]
mod state;
const DELTA_LOG_FOLDER: &str = "_delta_log";
pub(crate) const DEFAULT_RETRIES: usize = 15;
#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CommitMetrics {
pub num_retries: u64,
}
#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PostCommitMetrics {
pub new_checkpoint_created: bool,
pub num_log_files_cleaned_up: u64,
}
#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Metrics {
pub num_retries: u64,
pub new_checkpoint_created: bool,
pub num_log_files_cleaned_up: u64,
}
#[derive(thiserror::Error, Debug)]
pub enum TransactionError {
#[error("Tried committing existing table version: {0}")]
VersionAlreadyExists(Version),
#[error("Error serializing commit log to json: {json_err}")]
SerializeLogJson {
json_err: serde_json::error::Error,
},
#[error("Log storage error: {}", .source)]
ObjectStore {
#[from]
source: ObjectStoreError,
},
#[error("Failed to commit transaction: {0}")]
CommitConflict(#[from] CommitConflictError),
#[error("Failed to commit transaction: {0}")]
MaxCommitAttempts(i32),
#[error(
"The transaction includes Remove action with data change but Delta table is append-only"
)]
DeltaTableAppendOnly,
#[error("Unsupported table features required: {0:?}")]
UnsupportedTableFeatures(Vec<TableFeature>),
#[error("Table features must be specified, please specify: {0:?}")]
TableFeaturesRequired(TableFeature),
#[error("Transaction failed: {msg}")]
LogStoreError {
msg: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
}
impl From<TransactionError> for DeltaTableError {
fn from(err: TransactionError) -> Self {
match err {
TransactionError::VersionAlreadyExists(version) => {
DeltaTableError::VersionAlreadyExists(version)
}
TransactionError::SerializeLogJson { json_err } => {
DeltaTableError::SerializeLogJson { json_err }
}
TransactionError::ObjectStore { source } => DeltaTableError::ObjectStore { source },
other => DeltaTableError::Transaction { source: other },
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum CommitBuilderError {}
impl From<CommitBuilderError> for DeltaTableError {
fn from(err: CommitBuilderError) -> Self {
DeltaTableError::CommitValidation { source: err }
}
}
pub trait TableReference: Send + Sync {
fn config(&self) -> &TableProperties;
fn protocol(&self) -> &Protocol;
fn metadata(&self) -> &Metadata;
fn eager_snapshot(&self) -> &EagerSnapshot;
}
impl TableReference for EagerSnapshot {
fn protocol(&self) -> &Protocol {
EagerSnapshot::protocol(self)
}
fn metadata(&self) -> &Metadata {
EagerSnapshot::metadata(self)
}
fn config(&self) -> &TableProperties {
self.table_properties()
}
fn eager_snapshot(&self) -> &EagerSnapshot {
self
}
}
impl TableReference for DeltaTableState {
fn config(&self) -> &TableProperties {
self.snapshot.config()
}
fn protocol(&self) -> &Protocol {
self.snapshot.protocol()
}
fn metadata(&self) -> &Metadata {
self.snapshot.metadata()
}
fn eager_snapshot(&self) -> &EagerSnapshot {
&self.snapshot
}
}
#[derive(Debug)]
pub struct CommitData {
pub actions: Vec<Action>,
pub operation: DeltaOperation,
pub app_metadata: HashMap<String, Value>,
pub app_transactions: Vec<Transaction>,
}
impl CommitData {
pub fn new(
mut actions: Vec<Action>,
operation: DeltaOperation,
mut app_metadata: HashMap<String, Value>,
app_transactions: Vec<Transaction>,
) -> Self {
if !actions.iter().any(|a| matches!(a, Action::CommitInfo(..))) {
let mut commit_info = operation.get_commit_info();
commit_info.timestamp = Some(Utc::now().timestamp_millis());
app_metadata
.entry("clientVersion".to_string())
.or_insert(Value::String(format!("delta-rs.{}", crate_version())));
app_metadata.extend(commit_info.info);
commit_info.info = app_metadata.clone();
actions.insert(0, Action::CommitInfo(commit_info));
}
for txn in &app_transactions {
actions.push(Action::Txn(txn.clone()))
}
CommitData {
actions,
operation,
app_metadata,
app_transactions,
}
}
pub fn get_bytes(&self) -> Result<bytes::Bytes, TransactionError> {
let mut jsons = Vec::<String>::new();
for action in &self.actions {
let json = serde_json::to_string(action)
.map_err(|e| TransactionError::SerializeLogJson { json_err: e })?;
jsons.push(json);
}
Ok(bytes::Bytes::from(jsons.join("\n")))
}
}
#[derive(Clone, Debug, Copy)]
pub struct PostCommitHookProperties {
create_checkpoint: bool,
cleanup_expired_logs: Option<bool>,
}
#[derive(Clone, Debug)]
pub struct CommitProperties {
pub(crate) app_metadata: HashMap<String, Value>,
pub(crate) app_transaction: Vec<Transaction>,
max_retries: usize,
create_checkpoint: bool,
cleanup_expired_logs: Option<bool>,
}
impl Default for CommitProperties {
fn default() -> Self {
Self {
app_metadata: Default::default(),
app_transaction: Vec::new(),
max_retries: DEFAULT_RETRIES,
create_checkpoint: true,
cleanup_expired_logs: None,
}
}
}
impl CommitProperties {
pub fn with_metadata(
mut self,
metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
) -> Self {
self.app_metadata = HashMap::from_iter(metadata);
self
}
pub fn with_max_retries(mut self, max_retries: usize) -> Self {
self.max_retries = max_retries;
self
}
pub fn with_create_checkpoint(mut self, create_checkpoint: bool) -> Self {
self.create_checkpoint = create_checkpoint;
self
}
pub fn with_application_transaction(mut self, txn: Transaction) -> Self {
self.app_transaction.push(txn);
self
}
pub fn with_application_transactions(mut self, txn: Vec<Transaction>) -> Self {
self.app_transaction = txn;
self
}
pub fn with_cleanup_expired_logs(mut self, cleanup_expired_logs: Option<bool>) -> Self {
self.cleanup_expired_logs = cleanup_expired_logs;
self
}
}
impl From<CommitProperties> for CommitBuilder {
fn from(value: CommitProperties) -> Self {
CommitBuilder {
max_retries: value.max_retries,
app_metadata: value.app_metadata,
post_commit_hook: Some(PostCommitHookProperties {
create_checkpoint: value.create_checkpoint,
cleanup_expired_logs: value.cleanup_expired_logs,
}),
app_transaction: value.app_transaction,
..Default::default()
}
}
}
pub struct CommitBuilder {
actions: Vec<Action>,
app_metadata: HashMap<String, Value>,
app_transaction: Vec<Transaction>,
max_retries: usize,
post_commit_hook: Option<PostCommitHookProperties>,
post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
operation_id: Uuid,
}
impl Default for CommitBuilder {
fn default() -> Self {
CommitBuilder {
actions: Vec::new(),
app_metadata: HashMap::new(),
app_transaction: Vec::new(),
max_retries: DEFAULT_RETRIES,
post_commit_hook: None,
post_commit_hook_handler: None,
operation_id: Uuid::new_v4(),
}
}
}
impl<'a> CommitBuilder {
pub fn with_actions(mut self, actions: Vec<Action>) -> Self {
self.actions = actions;
self
}
pub fn with_app_metadata(mut self, app_metadata: HashMap<String, Value>) -> Self {
self.app_metadata = app_metadata;
self
}
pub fn with_max_retries(mut self, max_retries: usize) -> Self {
self.max_retries = max_retries;
self
}
pub fn with_post_commit_hook(mut self, post_commit_hook: PostCommitHookProperties) -> Self {
self.post_commit_hook = Some(post_commit_hook);
self
}
pub fn with_operation_id(mut self, operation_id: Uuid) -> Self {
self.operation_id = operation_id;
self
}
pub fn with_post_commit_hook_handler(
mut self,
handler: Option<Arc<dyn CustomExecuteHandler>>,
) -> Self {
self.post_commit_hook_handler = handler;
self
}
pub fn build(
self,
table_data: Option<&'a dyn TableReference>,
log_store: LogStoreRef,
operation: DeltaOperation,
) -> PreCommit<'a> {
let data = CommitData::new(
self.actions,
operation,
self.app_metadata,
self.app_transaction,
);
PreCommit {
log_store,
table_data,
max_retries: self.max_retries,
data,
post_commit_hook: self.post_commit_hook,
post_commit_hook_handler: self.post_commit_hook_handler,
operation_id: self.operation_id,
}
}
}
pub struct PreCommit<'a> {
log_store: LogStoreRef,
table_data: Option<&'a dyn TableReference>,
data: CommitData,
max_retries: usize,
post_commit_hook: Option<PostCommitHookProperties>,
post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
operation_id: Uuid,
}
impl<'a> std::future::IntoFuture for PreCommit<'a> {
type Output = DeltaResult<FinalizedCommit>;
type IntoFuture = BoxFuture<'a, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move { self.into_prepared_commit_future().await?.await?.await })
}
}
impl<'a> PreCommit<'a> {
pub fn into_prepared_commit_future(self) -> BoxFuture<'a, DeltaResult<PreparedCommit<'a>>> {
let this = self;
async fn write_tmp_commit(
log_entry: Bytes,
store: ObjectStoreRef,
) -> DeltaResult<CommitOrBytes> {
let token = uuid::Uuid::new_v4().to_string();
let path = Path::from_iter([DELTA_LOG_FOLDER, &format!("_commit_{token}.json.tmp")]);
store.put(&path, log_entry.into()).await?;
Ok(CommitOrBytes::TmpCommit(path))
}
Box::pin(async move {
if let Some(table_reference) = this.table_data {
PROTOCOL.can_commit(table_reference, &this.data.actions, &this.data.operation)?;
}
let log_entry = this.data.get_bytes()?;
let commit_or_bytes = if ["LakeFSLogStore", "DefaultLogStore"]
.contains(&this.log_store.name().as_str())
{
CommitOrBytes::LogBytes(log_entry)
} else {
write_tmp_commit(
log_entry,
this.log_store.object_store(Some(this.operation_id)),
)
.await?
};
Ok(PreparedCommit {
commit_or_bytes,
log_store: this.log_store,
table_data: this.table_data,
max_retries: this.max_retries,
data: this.data,
post_commit: this.post_commit_hook,
post_commit_hook_handler: this.post_commit_hook_handler,
operation_id: this.operation_id,
})
})
}
}
pub struct PreparedCommit<'a> {
commit_or_bytes: CommitOrBytes,
log_store: LogStoreRef,
data: CommitData,
table_data: Option<&'a dyn TableReference>,
max_retries: usize,
post_commit: Option<PostCommitHookProperties>,
post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
operation_id: Uuid,
}
impl PreparedCommit<'_> {
pub fn commit_or_bytes(&self) -> &CommitOrBytes {
&self.commit_or_bytes
}
}
impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
type Output = DeltaResult<PostCommit>;
type IntoFuture = BoxFuture<'a, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
let this = self;
Box::pin(async move {
let commit_or_bytes = this.commit_or_bytes;
let mut attempt_number: usize = 1;
let read_snapshot: EagerSnapshot = if let Some(table_data) = this.table_data {
table_data.eager_snapshot().clone()
} else {
debug!("committing initial table version 0");
match this
.log_store
.write_commit_entry(0, commit_or_bytes.clone(), this.operation_id)
.await
{
Ok(_) => {
return Ok(PostCommit {
version: 0,
data: this.data,
create_checkpoint: false,
cleanup_expired_logs: None,
log_store: this.log_store,
table_data: None,
custom_execute_handler: this.post_commit_hook_handler,
metrics: CommitMetrics { num_retries: 0 },
});
}
Err(TransactionError::VersionAlreadyExists(0)) => {
debug!("version 0 already exists, loading table state for retry");
attempt_number = 2;
let latest_version: Version = this.log_store.get_latest_version(0).await?;
EagerSnapshot::try_new(
this.log_store.as_ref(),
Default::default(),
Some(latest_version),
)
.await?
}
Err(e) => return Err(e.into()),
}
};
let mut read_snapshot = read_snapshot;
let commit_span = info_span!(
"commit_with_retries",
base_version = read_snapshot.version(),
max_retries = this.max_retries,
attempt = field::Empty,
target_version = field::Empty,
conflicts_checked = 0
);
async move {
let total_retries = this.max_retries + 1;
while attempt_number <= total_retries {
Span::current().record("attempt", attempt_number);
let latest_version = this
.log_store
.get_latest_version(read_snapshot.version())
.await?;
if latest_version > read_snapshot.version() {
if this.max_retries == 0 {
warn!(
base_version = read_snapshot.version(),
latest_version = latest_version,
"table updated but max_retries is 0, failing immediately"
);
return Err(TransactionError::MaxCommitAttempts(
this.max_retries as i32,
)
.into());
}
warn!(
base_version = read_snapshot.version(),
latest_version = latest_version,
versions_behind = latest_version - read_snapshot.version(),
"table updated during transaction, checking for conflicts"
);
let mut steps = latest_version - read_snapshot.version();
let mut conflicts_checked = 0;
while steps != 0 {
conflicts_checked += 1;
let summary = WinningCommitSummary::try_new(
this.log_store.as_ref(),
latest_version - steps,
(latest_version - steps) + 1,
)
.await?;
let transaction_info = TransactionInfo::try_new(
read_snapshot.log_data(),
this.data.operation.read_predicate(),
&this.data.actions,
this.data.operation.read_whole_table(),
)?;
let conflict_checker = ConflictChecker::new(
transaction_info,
summary,
Some(&this.data.operation),
);
match conflict_checker.check_conflicts() {
Ok(_) => {}
Err(err) => {
error!(
conflicts_checked = conflicts_checked,
error = %err,
"conflict detected, aborting transaction"
);
return Err(TransactionError::CommitConflict(err).into());
}
}
steps -= 1;
}
Span::current().record("conflicts_checked", conflicts_checked);
debug!(
conflicts_checked = conflicts_checked,
"all conflicts resolved, updating snapshot"
);
read_snapshot
.update(&this.log_store, Some(latest_version))
.await?;
}
let version: Version = latest_version + 1;
Span::current().record("target_version", version);
match this
.log_store
.write_commit_entry(version, commit_or_bytes.clone(), this.operation_id)
.await
{
Ok(()) => {
info!(
version = version,
num_retries = attempt_number - 1,
"transaction committed successfully"
);
return Ok(PostCommit {
version,
data: this.data,
create_checkpoint: this
.post_commit
.map(|v| v.create_checkpoint)
.unwrap_or_default(),
cleanup_expired_logs: this
.post_commit
.map(|v| v.cleanup_expired_logs)
.unwrap_or_default(),
log_store: this.log_store,
table_data: Some(Box::new(read_snapshot)),
custom_execute_handler: this.post_commit_hook_handler,
metrics: CommitMetrics {
num_retries: (attempt_number - 1) as u64,
},
});
}
Err(TransactionError::VersionAlreadyExists(version)) => {
warn!(
version = version,
attempt = attempt_number,
"version already exists, will retry"
);
attempt_number += 1;
}
Err(err) => {
error!(
version = version,
error = %err,
"commit failed, aborting"
);
this.log_store
.abort_commit_entry(version, commit_or_bytes, this.operation_id)
.await?;
return Err(err.into());
}
}
}
error!(
max_retries = this.max_retries,
"exceeded maximum commit attempts"
);
Err(TransactionError::MaxCommitAttempts(this.max_retries as i32).into())
}
.instrument(commit_span)
.await
})
}
}
pub struct PostCommit {
pub version: Version,
pub data: CommitData,
create_checkpoint: bool,
cleanup_expired_logs: Option<bool>,
log_store: LogStoreRef,
table_data: Option<Box<dyn TableReference>>,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
metrics: CommitMetrics,
}
impl PostCommit {
async fn run_post_commit_hook(&self) -> DeltaResult<(DeltaTableState, PostCommitMetrics)> {
if let Some(table) = &self.table_data {
let post_commit_operation_id = Uuid::new_v4();
let mut snapshot = table.eager_snapshot().clone();
if self.version != snapshot.version() {
snapshot.update(&self.log_store, Some(self.version)).await?;
}
let mut state = DeltaTableState { snapshot };
let cleanup_logs = if let Some(cleanup_logs) = self.cleanup_expired_logs {
cleanup_logs
} else {
state.table_config().enable_expired_log_cleanup()
};
if let Some(custom_execute_handler) = &self.custom_execute_handler {
custom_execute_handler
.before_post_commit_hook(
&self.log_store,
cleanup_logs || self.create_checkpoint,
post_commit_operation_id,
)
.await?
}
let mut new_checkpoint_created = false;
if self.create_checkpoint {
new_checkpoint_created = self
.create_checkpoint(
&state,
&self.log_store,
self.version,
post_commit_operation_id,
)
.await?;
}
let mut num_log_files_cleaned_up: u64 = 0;
if cleanup_logs {
num_log_files_cleaned_up = cleanup_expired_logs_for(
self.version,
self.log_store.as_ref(),
Utc::now().timestamp_millis()
- state.table_config().log_retention_duration().as_millis() as i64,
Some(post_commit_operation_id),
)
.await? as u64;
if num_log_files_cleaned_up > 0 {
state = DeltaTableState::try_new(
&self.log_store,
state.load_config().clone(),
Some(self.version),
)
.await?;
}
}
if let Some(custom_execute_handler) = &self.custom_execute_handler {
custom_execute_handler
.after_post_commit_hook(
&self.log_store,
cleanup_logs || self.create_checkpoint,
post_commit_operation_id,
)
.await?
}
Ok((
state,
PostCommitMetrics {
new_checkpoint_created,
num_log_files_cleaned_up,
},
))
} else {
let state =
DeltaTableState::try_new(&self.log_store, Default::default(), Some(self.version))
.await?;
Ok((
state,
PostCommitMetrics {
new_checkpoint_created: false,
num_log_files_cleaned_up: 0,
},
))
}
}
async fn create_checkpoint(
&self,
table_state: &DeltaTableState,
log_store: &LogStoreRef,
version: Version,
operation_id: Uuid,
) -> DeltaResult<bool> {
if !table_state.load_config().require_files {
warn!(
"Checkpoint creation in post_commit_hook has been skipped due to table being initialized without files."
);
return Ok(false);
}
let checkpoint_interval = table_state.config().checkpoint_interval().get();
if (version + 1).is_multiple_of(checkpoint_interval) {
create_checkpoint_for(version, log_store.as_ref(), Some(operation_id)).await?;
Ok(true)
} else {
Ok(false)
}
}
}
pub struct FinalizedCommit {
pub snapshot: DeltaTableState,
pub version: Version,
pub metrics: Metrics,
}
impl std::fmt::Debug for FinalizedCommit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FinalizedCommit")
.field("version", &self.version)
.field("metrics", &self.metrics)
.finish()
}
}
impl FinalizedCommit {
pub fn snapshot(&self) -> DeltaTableState {
self.snapshot.clone()
}
pub fn version(&self) -> Version {
self.version
}
}
impl std::future::IntoFuture for PostCommit {
type Output = DeltaResult<FinalizedCommit>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
let this = self;
Box::pin(async move {
match this.run_post_commit_hook().await {
Ok((snapshot, post_commit_metrics)) => Ok(FinalizedCommit {
snapshot,
version: this.version,
metrics: Metrics {
num_retries: this.metrics.num_retries,
new_checkpoint_created: post_commit_metrics.new_checkpoint_created,
num_log_files_cleaned_up: post_commit_metrics.num_log_files_cleaned_up,
},
}),
Err(err) => Err(err),
}
})
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::logstore::{LogStore, StorageConfig, default_logstore::DefaultLogStore};
use object_store::{ObjectStore, PutPayload, memory::InMemory};
use serde_json::json;
use url::Url;
#[tokio::test]
async fn test_try_commit_transaction() {
let store = Arc::new(InMemory::new());
let url = Url::parse("mem://what/is/this").unwrap();
let log_store = DefaultLogStore::new(
store.clone(),
store.clone(),
crate::logstore::LogStoreConfig::new(&url, StorageConfig::default()),
);
let version_path = Path::from("_delta_log/00000000000000000000.json");
store.put(&version_path, PutPayload::new()).await.unwrap();
let res = log_store
.write_commit_entry(
0,
CommitOrBytes::LogBytes(PutPayload::new().into()),
Uuid::new_v4(),
)
.await;
assert!(res.is_err());
log_store
.write_commit_entry(
1,
CommitOrBytes::LogBytes(PutPayload::new().into()),
Uuid::new_v4(),
)
.await
.unwrap();
}
#[test]
fn test_commit_with_retries_tracing_span() {
let span = info_span!(
"commit_with_retries",
base_version = 5,
max_retries = 10,
attempt = field::Empty,
target_version = field::Empty,
conflicts_checked = 0
);
let metadata = span.metadata().expect("span should have metadata");
assert_eq!(metadata.name(), "commit_with_retries");
assert_eq!(metadata.level(), &Level::INFO);
assert!(metadata.is_span());
span.record("attempt", 1);
span.record("target_version", 6);
span.record("conflicts_checked", 2);
}
#[test]
fn test_commit_properties_with_retries() {
let props = CommitProperties::default()
.with_max_retries(5)
.with_create_checkpoint(false);
assert_eq!(props.max_retries, 5);
assert!(!props.create_checkpoint);
}
#[test]
fn test_commit_metrics() {
let metrics = CommitMetrics { num_retries: 3 };
assert_eq!(metrics.num_retries, 3);
}
#[test]
fn test_commit_data_client_version() {
let no_metadata = CommitData::new(
vec![],
DeltaOperation::FileSystemCheck {},
HashMap::new(),
vec![],
);
assert_eq!(
*no_metadata.app_metadata.get("clientVersion").unwrap(),
json!(format!("delta-rs.{}", crate_version()))
);
let with_metadata = CommitData::new(
vec![],
DeltaOperation::FileSystemCheck {},
HashMap::from([("clientVersion".to_owned(), json!("test-client.0.0.1"))]),
vec![],
);
assert_eq!(
*with_metadata.app_metadata.get("clientVersion").unwrap(),
json!("test-client.0.0.1")
);
}
}