use std::borrow::Cow;
use std::fmt;
use std::fmt::{
Debug,
Formatter,
};
use std::num::NonZeroUsize;
use hedera_proto::services;
use prost::Message;
use time::Duration;
use crate::execute::execute;
use crate::signer::AnySigner;
use crate::{
AccountId,
Client,
Error,
Hbar,
Operator,
PrivateKey,
PublicKey,
Signer,
TransactionId,
TransactionResponse,
ValidateChecksums,
};
mod any;
mod chunked;
mod execute;
mod protobuf;
mod source;
#[cfg(test)]
mod tests;
pub use any::AnyTransaction;
#[cfg(feature = "ffi")]
pub(crate) use any::AnyTransactionBody;
pub(crate) use any::AnyTransactionData;
pub(crate) use chunked::{
ChunkData,
ChunkInfo,
ChunkedTransactionData,
};
#[cfg(feature = "ffi")]
pub(crate) use execute::SourceTransaction;
pub(crate) use execute::{
TransactionData,
TransactionExecute,
TransactionExecuteChunked,
};
pub(crate) use protobuf::{
ToSchedulableTransactionDataProtobuf,
ToTransactionDataProtobuf,
};
pub(crate) use source::TransactionSources;
const DEFAULT_TRANSACTION_VALID_DURATION: Duration = Duration::seconds(120);
#[cfg_attr(feature = "ffi", derive(serde::Serialize))]
#[cfg_attr(feature = "ffi", serde(rename_all = "camelCase"))]
pub struct Transaction<D> {
#[cfg_attr(feature = "ffi", serde(flatten))]
#[cfg_attr(feature = "ffi", serde(bound = "D: Into<AnyTransactionData> + Clone"))]
body: TransactionBody<D>,
#[cfg_attr(feature = "ffi", serde(skip))]
signers: Vec<AnySigner>,
#[cfg_attr(feature = "ffi", serde(skip))]
sources: Option<TransactionSources>,
}
#[derive(Debug, Default, Clone)]
#[cfg_attr(feature = "ffi", serde_with::skip_serializing_none)]
#[cfg_attr(feature = "ffi", derive(serde::Serialize))]
#[cfg_attr(feature = "ffi", serde(rename_all = "camelCase"))]
#[allow(clippy::type_repetition_in_bounds)]
pub(crate) struct TransactionBody<D> {
#[cfg_attr(feature = "ffi", serde(flatten))]
#[cfg_attr(
feature = "ffi",
serde(with = "serde_with::As::<serde_with::FromInto<AnyTransactionData>>")
)]
#[cfg_attr(feature = "ffi", serde(bound = "D: Into<AnyTransactionData> + Clone"))]
pub(crate) data: D,
pub(crate) node_account_ids: Option<Vec<AccountId>>,
#[cfg_attr(
feature = "ffi",
serde(with = "serde_with::As::<Option<serde_with::DurationSeconds<i64>>>")
)]
pub(crate) transaction_valid_duration: Option<Duration>,
pub(crate) max_transaction_fee: Option<Hbar>,
#[cfg_attr(feature = "ffi", serde(skip_serializing_if = "String::is_empty"))]
pub(crate) transaction_memo: String,
pub(crate) transaction_id: Option<TransactionId>,
pub(crate) operator: Option<Operator>,
#[cfg_attr(feature = "ffi", serde(skip_serializing_if = "std::ops::Not::not"))]
pub(crate) is_frozen: bool,
}
impl<D> Default for Transaction<D>
where
D: Default,
{
fn default() -> Self {
Self {
body: TransactionBody {
data: D::default(),
node_account_ids: None,
transaction_valid_duration: None,
max_transaction_fee: None,
transaction_memo: String::new(),
transaction_id: None,
operator: None,
is_frozen: false,
},
signers: Vec::new(),
sources: None,
}
}
}
impl<D> Debug for Transaction<D>
where
D: Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Transaction").field("body", &self.body).finish()
}
}
impl<D> Transaction<D>
where
D: Default,
{
#[inline]
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
impl<D> Transaction<D> {
pub(crate) fn from_parts(body: TransactionBody<D>, signers: Vec<AnySigner>) -> Self {
Self { body, signers, sources: None }
}
pub(crate) fn is_frozen(&self) -> bool {
self.body.is_frozen
}
pub(crate) fn signers(&self) -> impl Iterator<Item = &AnySigner> {
self.signers.iter()
}
pub(crate) fn sources(&self) -> Option<&TransactionSources> {
self.sources.as_ref()
}
#[track_caller]
pub(crate) fn require_not_frozen(&self) {
assert!(
!self.is_frozen(),
"transaction is immutable; it has at least one signature or has been explicitly frozen"
);
}
#[cfg(feature = "ffi")]
pub(crate) fn body(&self) -> &TransactionBody<D> {
&self.body
}
fn body_mut(&mut self) -> &mut TransactionBody<D> {
self.require_not_frozen();
&mut self.body
}
pub(crate) fn into_body(self) -> TransactionBody<D> {
self.body
}
pub(crate) fn data(&self) -> &D {
&self.body.data
}
pub(crate) fn data_mut(&mut self) -> &mut D {
self.require_not_frozen();
&mut self.body.data
}
#[must_use]
pub fn get_node_account_ids(&self) -> Option<&[AccountId]> {
self.body.node_account_ids.as_deref()
}
#[track_caller]
pub fn node_account_ids(&mut self, ids: impl IntoIterator<Item = AccountId>) -> &mut Self {
self.body_mut().node_account_ids = Some(ids.into_iter().collect());
self
}
#[must_use]
pub fn get_transaction_valid_duration(&self) -> Option<Duration> {
self.body.transaction_valid_duration
}
pub fn transaction_valid_duration(&mut self, duration: Duration) -> &mut Self {
self.body_mut().transaction_valid_duration = Some(duration);
self
}
#[must_use]
pub fn get_max_transaction_fee(&self) -> Option<Hbar> {
self.body.max_transaction_fee
}
pub fn max_transaction_fee(&mut self, fee: Hbar) -> &mut Self {
self.body_mut().max_transaction_fee = Some(fee);
self
}
#[must_use]
pub fn get_transaction_memo(&self) -> &str {
&self.body.transaction_memo
}
pub fn transaction_memo(&mut self, memo: impl AsRef<str>) -> &mut Self {
self.body_mut().transaction_memo = memo.as_ref().to_owned();
self
}
#[must_use]
pub fn get_transaction_id(&self) -> Option<TransactionId> {
self.body.transaction_id
}
pub fn transaction_id(&mut self, id: TransactionId) -> &mut Self {
self.body_mut().transaction_id = Some(id);
self
}
pub fn sign(&mut self, private_key: PrivateKey) -> &mut Self {
self.sign_signer(AnySigner::PrivateKey(private_key))
}
pub fn sign_with(&mut self, public_key: PublicKey, signer: Signer) -> &mut Self {
self.sign_signer(AnySigner::Arbitrary(Box::new(public_key), signer))
}
pub(crate) fn sign_signer(&mut self, signer: AnySigner) -> &mut Self {
if self.signers.iter().any(|it| it.public_key() == signer.public_key()) {
return self;
}
self.signers.push(signer);
self
}
}
impl<D: ChunkedTransactionData> Transaction<D> {
#[must_use]
pub fn get_max_chunks(&self) -> usize {
self.data().chunk_data().max_chunks
}
pub fn max_chunks(&mut self, max_chunks: usize) -> &mut Self {
self.data_mut().chunk_data_mut().max_chunks = max_chunks;
self
}
pub fn get_chunk_size(&self) -> usize {
self.data().chunk_data().chunk_size.get()
}
pub fn chunk_size(&mut self, size: usize) -> &mut Self {
let Some(size) = NonZeroUsize::new(size) else {
panic!("Cannot set chunk-size to zero")
};
self.data_mut().chunk_data_mut().chunk_size = size;
self
}
}
impl<D: ValidateChecksums> Transaction<D> {
pub fn freeze(&mut self) -> crate::Result<&mut Self> {
self.freeze_with(None)
}
pub fn freeze_with<'a>(
&mut self,
client: impl Into<Option<&'a Client>>,
) -> crate::Result<&mut Self> {
if self.is_frozen() {
return Ok(self);
}
let client: Option<&Client> = client.into();
let node_account_ids = match &self.body.node_account_ids {
Some(it) => it.clone(),
None => {
client.ok_or_else(|| crate::Error::FreezeUnsetNodeAccountIds)?.random_node_ids()
}
};
let max_transaction_fee = self.body.max_transaction_fee.or_else(|| {
let client_max_transaction_fee = client
.map(|it| it.max_transaction_fee().load(std::sync::atomic::Ordering::Relaxed));
match client_max_transaction_fee {
Some(max) if max > 1 => Some(Hbar::from_tinybars(max as i64)),
_ => None,
}
});
let operator = client.and_then(|it| it.operator_internal().as_deref().map(|it| it.clone()));
self.body.node_account_ids = Some(node_account_ids);
self.body.max_transaction_fee = max_transaction_fee;
self.body.operator = operator;
self.body.is_frozen = true;
if let Some(client) = client {
if client.auto_validate_checksums() {
if let Some(ledger_id) = &*client.ledger_id_internal() {
self.validate_checksums(ledger_id)?;
} else {
return Err(crate::Error::CannotPerformTaskWithoutLedgerId {
task: "validate checksums",
});
}
}
}
Ok(self)
}
}
impl<D: TransactionExecute> Transaction<D> {
fn make_transaction_list(&self) -> crate::Result<Vec<services::Transaction>> {
assert!(self.is_frozen());
let initial_transaction_id = match self.get_transaction_id() {
Some(id) => id,
None => self
.body
.operator
.as_ref()
.ok_or(crate::Error::NoPayerAccountOrTransactionId)?
.generate_transaction_id(),
};
let transaction_list = {
let used_chunks = self.data().maybe_chunk_data().map_or(1, |it| it.used_chunks());
let node_account_ids = self.body.node_account_ids.as_deref().unwrap();
let mut transaction_list = Vec::with_capacity(used_chunks * node_account_ids.len());
for chunk in 0..used_chunks {
let current_transaction_id = match chunk {
0 => initial_transaction_id,
_ => self
.body
.operator
.as_ref()
.ok_or(crate::Error::NoPayerAccountOrTransactionId)?
.generate_transaction_id(),
};
for node_account_id in node_account_ids.iter().copied() {
let chunk_info = ChunkInfo {
current: chunk,
total: used_chunks,
initial_transaction_id,
current_transaction_id,
node_account_id,
};
transaction_list.push(self.make_request_inner(&chunk_info)?.0);
}
}
transaction_list
};
Ok(transaction_list)
}
pub(crate) fn make_sources(&self) -> crate::Result<Cow<'_, TransactionSources>> {
assert!(self.is_frozen());
if let Some(sources) = &self.sources {
return Ok(sources.sign_with(&self.signers));
}
return Ok(Cow::Owned(TransactionSources::new(self.make_transaction_list()?).unwrap()));
}
pub fn to_bytes(&self) -> crate::Result<Vec<u8>> {
assert!(self.is_frozen(), "Transaction must be frozen to call `to_bytes`");
let transaction_list = self
.sources
.as_ref()
.map(|it| Ok(it.transactions().to_vec()))
.unwrap_or_else(|| self.make_transaction_list())?;
Ok(hedera_proto::sdk::TransactionList { transaction_list }.encode_to_vec())
}
pub(crate) fn add_signature_signer(&mut self, signer: AnySigner) {
assert!(self.is_frozen());
if self.body.node_account_ids.as_deref().map_or(0, |it| it.len()) != 1 {
panic!("cannot manually add a signature to a transaction with multiple nodes")
}
if let Some(chunk_data) = self.data().maybe_chunk_data() {
assert!(
chunk_data.used_chunks() <= 1,
"cannot manually add a signature to a chunked transaction with multiple chunks (message length `{}` > chunk size `{}`)",
chunk_data.data.len(),
chunk_data.chunk_size
)
}
let sources = self.make_sources().unwrap();
assert!(sources.transactions().len() == 1);
let sources = sources.sign_with(std::slice::from_ref(&signer));
match sources {
Cow::Owned(it) => self.sources = Some(it),
Cow::Borrowed(_) => {}
}
}
pub(crate) fn _add_signature(&mut self, pk: PublicKey, signature: Vec<u8>) -> &mut Self {
self.add_signature_signer(AnySigner::Arbitrary(
Box::new(pk),
Box::new(move |_| signature.clone()),
));
self
}
}
impl<D> Transaction<D>
where
D: TransactionExecute,
{
pub async fn execute(&mut self, client: &Client) -> crate::Result<TransactionResponse> {
self.execute_with_optional_timeout(client, None).await
}
pub(crate) async fn execute_with_optional_timeout(
&mut self,
client: &Client,
timeout: Option<std::time::Duration>,
) -> crate::Result<TransactionResponse> {
self.freeze_with(Some(client))?;
if let Some(sources) = &self.sources {
return self::execute::SourceTransaction::new(&self, &sources)
.execute(client, timeout)
.await;
}
if let Some(chunk_data) = self.data().maybe_chunk_data() {
return self
.execute_all_inner(chunk_data, client, timeout)
.await
.map(|mut it| it.swap_remove(0));
}
execute(client, self, timeout).await
}
async fn execute_all_inner(
&self,
chunk_data: &ChunkData,
client: &Client,
timeout_per_chunk: Option<std::time::Duration>,
) -> crate::Result<Vec<TransactionResponse>> {
assert!(self.is_frozen());
let wait_for_receipts = self.data().wait_for_receipt();
if chunk_data.data.len() > chunk_data.max_message_len() {
panic!("error: message too big")
}
let used_chunks = chunk_data.used_chunks();
let mut responses = Vec::with_capacity(chunk_data.used_chunks());
let initial_transaction_id = {
let resp = execute(
client,
&chunked::FirstChunkView { transaction: self, total_chunks: used_chunks },
timeout_per_chunk,
)
.await?;
if wait_for_receipts {
resp.get_receipt(client).await?;
}
let initial_transaction_id = resp.transaction_id;
responses.push(resp);
initial_transaction_id
};
for chunk in 1..used_chunks {
let resp = execute(
client,
&chunked::ChunkView {
transaction: self,
initial_transaction_id,
current_chunk: chunk,
total_chunks: used_chunks,
},
timeout_per_chunk,
)
.await?;
if wait_for_receipts {
resp.get_receipt(client).await?;
}
responses.push(resp);
}
Ok(responses)
}
#[allow(clippy::missing_errors_doc)]
pub async fn execute_with_timeout(
&mut self,
client: &Client,
timeout: std::time::Duration,
) -> crate::Result<TransactionResponse> {
self.execute_with_optional_timeout(client, Some(timeout)).await
}
}
impl<D> Transaction<D>
where
D: TransactionExecuteChunked,
{
pub async fn execute_all(
&mut self,
client: &Client,
) -> crate::Result<Vec<TransactionResponse>> {
self.execute_all_with_optional_timeout(client, None).await
}
pub(crate) async fn execute_all_with_optional_timeout(
&mut self,
client: &Client,
timeout_per_chunk: Option<std::time::Duration>,
) -> crate::Result<Vec<TransactionResponse>> {
self.freeze_with(Some(client))?;
if let Some(sources) = &self.sources {
return self::execute::SourceTransaction::new(&self, &sources)
.execute_all(client, timeout_per_chunk)
.await;
}
let Some(chunk_data) = self.data().maybe_chunk_data() else {
return Ok(Vec::from([self.execute_with_optional_timeout(client, timeout_per_chunk).await?]))
};
self.execute_all_inner(chunk_data, client, timeout_per_chunk).await
}
}
impl AnyTransaction {
#[allow(deprecated)]
pub fn from_bytes(bytes: &[u8]) -> crate::Result<Self> {
let list =
hedera_proto::sdk::TransactionList::decode(bytes).map_err(Error::from_protobuf)?;
let list = if list.transaction_list.is_empty() {
Vec::from([services::Transaction::decode(bytes).map_err(Error::from_protobuf)?])
} else {
list.transaction_list
};
let sources = TransactionSources::new(list)?;
let transaction_bodies: Result<Vec<_>, _> = sources
.signed_transactions()
.iter()
.map(|it| {
services::TransactionBody::decode(&*it.body_bytes).map_err(Error::from_protobuf)
})
.collect();
let transaction_bodies = transaction_bodies?;
{
let (first, transaction_bodies) = transaction_bodies
.split_first()
.ok_or_else(|| Error::from_protobuf("no transactions found"))?;
for it in transaction_bodies.iter() {
if !pb_transaction_body_eq(first, it) {
return Err(Error::from_protobuf("transaction parts unexpectedly unequal"));
}
}
}
let transaction_data = {
let data: Result<_, _> = sources
.chunks()
.filter_map(|it| it.signed_transactions().first())
.map(|it| {
services::TransactionBody::decode(&*it.body_bytes)
.map_err(Error::from_protobuf)
.and_then(|pb| pb_getf!(pb, data))
})
.collect();
data?
};
let mut res = Self::from_protobuf(transaction_bodies[0].clone(), transaction_data)?;
res.body.node_account_ids = Some(sources.node_ids().to_vec());
res.sources = Some(sources);
Ok(res)
}
}
#[allow(deprecated)]
fn pb_transaction_body_eq(
lhs: &services::TransactionBody,
rhs: &services::TransactionBody,
) -> bool {
let services::TransactionBody {
transaction_id: _,
node_account_id: _,
transaction_fee,
transaction_valid_duration,
generate_record,
memo,
data,
} = rhs;
if &lhs.transaction_fee != transaction_fee {
return false;
}
if &lhs.transaction_valid_duration != transaction_valid_duration {
return false;
}
if &lhs.generate_record != generate_record {
return false;
}
if &lhs.memo != memo {
return false;
}
match (&lhs.data, data) {
(None, None) => {}
(Some(lhs), Some(rhs)) => match (lhs, rhs) {
(
services::transaction_body::Data::ConsensusSubmitMessage(lhs),
services::transaction_body::Data::ConsensusSubmitMessage(rhs),
) => {
let services::ConsensusSubmitMessageTransactionBody {
topic_id,
message: _,
chunk_info,
} = rhs;
if &lhs.topic_id != topic_id {
return false;
}
match (lhs.chunk_info.as_ref(), chunk_info.as_ref()) {
(None, None) => {}
(Some(lhs), Some(rhs)) => {
let services::ConsensusMessageChunkInfo {
initial_transaction_id,
total,
number: _,
} = rhs;
if &lhs.initial_transaction_id != initial_transaction_id {
return false;
}
if &lhs.total != total {
return false;
}
}
(Some(_), None) | (None, Some(_)) => return false,
}
}
(
services::transaction_body::Data::FileAppend(lhs),
services::transaction_body::Data::FileAppend(rhs),
) => {
let services::FileAppendTransactionBody { file_id, contents: _ } = rhs;
if &lhs.file_id != file_id {
return false;
}
}
(_, _) if lhs != rhs => return false,
_ => {}
},
(Some(_), None) | (None, Some(_)) => return false,
}
true
}