hedera 0.9.0

The SDK for interacting with Hedera Hashgraph.
/*
 * ‌
 * Hedera Rust SDK
 * ​
 * Copyright (C) 2022 - 2023 Hedera Hashgraph, LLC
 * ​
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ‍
 */

use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};

use clap::Parser;
// `use futures::TryStreamExt`, this is better practice though.
use futures_util::TryStreamExt;
use hedera::{
    AccountId, Client, PrivateKey, TopicId, TopicMessageQuery, TopicMessageSubmitTransaction
};
use parking_lot::RwLock;
use tokio::time::sleep;

#[derive(Parser, Debug)]
struct Args {
    #[clap(long, env)]
    operator_account_id: AccountId,

    #[clap(long, env)]
    operator_key: PrivateKey,

    #[clap(long, env, default_value = "0.0.34945875")]
    topic: TopicId,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let _ = dotenvy::dotenv();
    let args = Args::parse();

    let client = Client::for_testnet();

    client.set_operator(args.operator_account_id, args.operator_key);

    let message_send_times = Arc::new(RwLock::new(HashMap::new()));

    tokio::spawn({
        let client = client.clone();
        let message_send_times = message_send_times.clone();

        async move {
            for index in 0.. {
                let message = format!("hello, {index}");
                let time = Instant::now();

                message_send_times.write().insert(message.clone(), time);

                // send a message, crash the example program if it fails
                TopicMessageSubmitTransaction::new()
                    .topic_id(args.topic)
                    .message(message.as_bytes())
                    .execute(&client)
                    .await
                    .unwrap();

                sleep(Duration::from_millis(500)).await;
            }
        }
    });

    let mut stream = TopicMessageQuery::new()
        .topic_id(args.topic)
        .subscribe(&client);

    let mut latencies = Vec::new();

    while let Some(tm) = stream.try_next().await? {
        let message = String::from_utf8(tm.contents)?;

        let times = message_send_times.read();
        let start = times.get(&message).unwrap();
        let latency = start.elapsed();

        println!(
            "recv: {}, message: {:?}, latency: {:.3?}",
            tm.sequence_number, message, latency,
        );

        latencies.push(latency.as_secs_f64());

        if latencies.len() == 100 {
            let avg: f64 = latencies.iter().copied().sum::<f64>() / (latencies.len() as f64);

            println!("---- average latency: {:.5}s ----", avg);

            latencies.clear();
        }
    }

    Ok(())
}