use super::{Transport, TransportCapability, TransportMessage, MessageMetadata};
use crate::{Error, Result};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ForumMessage {
pub id: String,
pub channel: String,
pub author: String,
pub content: String,
pub timestamp: u64,
pub block_hash: String,
pub tx_index: u32,
pub reply_to: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlockInfo {
pub hash: String,
pub number: u64,
pub parent_hash: String,
pub timestamp: u64,
pub tx_count: u32,
}
#[derive(Debug, Clone)]
pub struct OnChainConfig {
pub rpc_endpoint: String,
pub default_channel: String,
pub our_fingerprint: String,
}
impl Default for OnChainConfig {
fn default() -> Self {
Self {
rpc_endpoint: "http://localhost:9944".into(),
default_channel: "#quantum".into(),
our_fingerprint: String::new(),
}
}
}
pub struct OnChainTransport {
config: OnChainConfig,
connected: bool,
subscribed_channels: Vec<String>,
incoming: Arc<Mutex<VecDeque<TransportMessage>>>,
last_block: Arc<Mutex<u64>>,
}
impl OnChainTransport {
pub fn new(config: OnChainConfig) -> Self {
Self {
config,
connected: false,
subscribed_channels: vec![],
incoming: Arc::new(Mutex::new(VecDeque::new())),
last_block: Arc::new(Mutex::new(0)),
}
}
pub fn subscribe_channel(&mut self, channel: &str) {
if !self.subscribed_channels.contains(&channel.to_string()) {
self.subscribed_channels.push(channel.to_string());
tracing::info!("Subscribed to on-chain channel: {}", channel);
}
}
pub async fn get_latest_block(&self) -> Result<BlockInfo> {
if !self.connected {
return Err(Error::NodeSync("Not connected".into()));
}
Ok(BlockInfo {
hash: "0x".to_string() + &"0".repeat(64),
number: *self.last_block.lock().await,
parent_hash: "0x".to_string() + &"0".repeat(64),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
tx_count: 0,
})
}
pub async fn post_message(&self, channel: &str, content: &str) -> Result<String> {
if !self.connected {
return Err(Error::NodeSync("Not connected".into()));
}
let tx_hash = format!("0x{:064x}", rand::random::<u128>());
tracing::info!(
"Posted message to {} (tx: {})",
channel,
&tx_hash[..10]
);
Ok(tx_hash)
}
pub async fn fetch_messages(
&self,
channel: &str,
from_block: u64,
limit: usize,
) -> Result<Vec<ForumMessage>> {
if !self.connected {
return Err(Error::NodeSync("Not connected".into()));
}
tracing::debug!(
"Fetching messages from {} (blocks {} onwards, limit {})",
channel,
from_block,
limit
);
Ok(Vec::new())
}
async fn sync_messages(&self) -> Result<Vec<ForumMessage>> {
let from_block = *self.last_block.lock().await;
let mut all_messages = Vec::new();
for channel in &self.subscribed_channels {
let messages = self.fetch_messages(channel, from_block, 100).await?;
all_messages.extend(messages);
}
if let Ok(latest) = self.get_latest_block().await {
*self.last_block.lock().await = latest.number;
}
Ok(all_messages)
}
fn to_transport_message(msg: ForumMessage) -> TransportMessage {
TransportMessage {
id: msg.id.clone(),
from: msg.author,
to: msg.channel,
payload: msg.content.into_bytes(),
timestamp: msg.timestamp * 1000,
metadata: MessageMetadata::OnChain {
block_hash: msg.block_hash,
tx_index: msg.tx_index,
},
}
}
}
#[async_trait]
impl Transport for OnChainTransport {
fn name(&self) -> &str {
"OnChain"
}
fn capabilities(&self) -> Vec<TransportCapability> {
vec![
TransportCapability::Send,
TransportCapability::Receive,
TransportCapability::Persistence,
TransportCapability::GroupChannel,
]
}
async fn is_connected(&self) -> bool {
self.connected
}
async fn connect(&mut self) -> Result<()> {
tracing::info!("Connecting to {}", self.config.rpc_endpoint);
self.connected = true;
self.subscribe_channel(&self.config.default_channel.clone());
let messages = self.sync_messages().await?;
let mut incoming = self.incoming.lock().await;
for msg in messages {
incoming.push_back(Self::to_transport_message(msg));
}
Ok(())
}
async fn disconnect(&mut self) -> Result<()> {
self.connected = false;
self.subscribed_channels.clear();
Ok(())
}
async fn send(&self, message: TransportMessage) -> Result<()> {
let channel = if message.to.starts_with('#') {
&message.to
} else {
return Err(Error::Transaction(
"Direct messages should use other transports".into(),
));
};
let content = String::from_utf8(message.payload)
.map_err(|_| Error::InvalidMessage("Invalid UTF-8 content".into()))?;
self.post_message(channel, &content).await?;
Ok(())
}
async fn receive(&mut self) -> Result<TransportMessage> {
loop {
if let Some(msg) = self.incoming.lock().await.pop_front() {
return Ok(msg);
}
let messages = self.sync_messages().await?;
if !messages.is_empty() {
let mut incoming = self.incoming.lock().await;
for msg in messages {
incoming.push_back(Self::to_transport_message(msg));
}
continue;
}
tokio::time::sleep(tokio::time::Duration::from_secs(6)).await;
if !self.connected {
return Err(Error::NodeSync("Disconnected".into()));
}
}
}
async fn poll(&mut self) -> Result<Option<TransportMessage>> {
if let Ok(messages) = self.sync_messages().await {
let mut incoming = self.incoming.lock().await;
for msg in messages {
incoming.push_back(Self::to_transport_message(msg));
}
}
Ok(self.incoming.lock().await.pop_front())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_onchain_transport() {
let config = OnChainConfig::default();
let transport = OnChainTransport::new(config);
assert!(!transport.is_connected().await);
assert_eq!(transport.name(), "OnChain");
}
#[test]
fn test_forum_message_serialization() {
let msg = ForumMessage {
id: "0x123".into(),
channel: "#quantum".into(),
author: "abc".into(),
content: "Hello".into(),
timestamp: 12345,
block_hash: "0x456".into(),
tx_index: 0,
reply_to: None,
};
let json = serde_json::to_string(&msg).unwrap();
let parsed: ForumMessage = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.channel, "#quantum");
}
}