pub mod cache;
pub mod msgbus;
pub mod queries;
use std::time::Duration;
use nautilus_common::{
logging::log_task_awaiting,
msgbus::database::{DatabaseConfig, MessageBusConfig},
};
use nautilus_core::{UUID4, string::SemVer};
use nautilus_model::identifiers::TraderId;
use redis::RedisError;
const REDIS_MIN_VERSION: &str = "6.2.0";
const REDIS_DELIMITER: char = ':';
const REDIS_INDEX_PATTERN: &str = ":index:";
const REDIS_XTRIM: &str = "XTRIM";
const REDIS_MINID: &str = "MINID";
const REDIS_FLUSHDB: &str = "FLUSHDB";
pub(crate) fn get_index_key(key: &str) -> anyhow::Result<&str> {
if let Some(pos) = key.find(REDIS_INDEX_PATTERN) {
return Ok(&key[pos + 1..]);
}
if key.starts_with("index:") {
return Ok(key);
}
anyhow::bail!("Invalid index key format: {key}")
}
async fn await_handle(handle: Option<tokio::task::JoinHandle<()>>, task_name: &str) {
if let Some(handle) = handle {
log_task_awaiting(task_name);
let timeout = Duration::from_secs(2);
match tokio::time::timeout(timeout, handle).await {
Ok(result) => {
if let Err(e) = result {
log::error!("Error awaiting task '{task_name}': {e:?}");
}
}
Err(_) => {
log::error!("Timeout {timeout:?} awaiting task '{task_name}'");
}
}
}
}
#[must_use]
pub fn get_redis_url(config: DatabaseConfig) -> (String, String) {
let host = config.host.unwrap_or("127.0.0.1".to_string());
let port = config.port.unwrap_or(6379);
let username = config.username.unwrap_or_default();
let password = config.password.unwrap_or_default();
let ssl = config.ssl;
let redact_pw = |pw: &str| {
if pw.len() > 4 {
format!("{}...{}", &pw[..2], &pw[pw.len() - 2..])
} else {
pw.to_owned()
}
};
let (auth, auth_redacted) = match (username.is_empty(), password.is_empty()) {
(false, false) => (
format!("{username}:{password}@"),
format!("{username}:{}@", redact_pw(&password)),
),
(true, false) => (
format!(":{password}@"),
format!(":{}@", redact_pw(&password)),
),
(false, true) => panic!(
"Redis config error: username supplied without password. \
Either supply a password or omit the username."
),
(true, true) => (String::new(), String::new()),
};
let scheme = if ssl { "rediss" } else { "redis" };
let url = format!("{scheme}://{auth}{host}:{port}");
let redacted_url = format!("{scheme}://{auth_redacted}{host}:{port}");
(url, redacted_url)
}
pub async fn create_redis_connection(
con_name: &str,
config: DatabaseConfig,
) -> anyhow::Result<redis::aio::ConnectionManager> {
log::debug!("Creating {con_name} redis connection");
let (redis_url, redacted_url) = get_redis_url(config.clone());
log::debug!("Connecting to {redacted_url}");
let connection_timeout = Duration::from_secs(u64::from(config.connection_timeout));
let response_timeout = Duration::from_secs(u64::from(config.response_timeout));
let number_of_retries = config.number_of_retries;
let exponent_base = config.exponent_base as f32;
let min_delay = Duration::from_millis(config.factor);
let max_delay = Duration::from_secs(config.max_delay);
let client = redis::Client::open(redis_url)?;
let connection_manager_config = redis::aio::ConnectionManagerConfig::new()
.set_exponent_base(exponent_base)
.set_number_of_retries(number_of_retries)
.set_response_timeout(Some(response_timeout))
.set_connection_timeout(Some(connection_timeout))
.set_min_delay(min_delay)
.set_max_delay(max_delay);
let mut con = client
.get_connection_manager_with_config(connection_manager_config)
.await?;
let version = get_redis_version(&mut con).await?;
let min_version = SemVer::parse(REDIS_MIN_VERSION)?;
let con_msg = format!("Connected to redis v{version}");
if version >= min_version {
log::info!("{con_msg}");
} else {
log::error!("{con_msg}, but minimum supported version is {REDIS_MIN_VERSION}");
}
Ok(con)
}
pub async fn flush_redis(
con: &mut redis::aio::ConnectionManager,
) -> anyhow::Result<(), RedisError> {
redis::cmd(REDIS_FLUSHDB).exec_async(con).await
}
#[must_use]
pub fn get_stream_key(
trader_id: TraderId,
instance_id: UUID4,
config: &MessageBusConfig,
) -> String {
let mut stream_key = String::new();
if config.use_trader_prefix {
stream_key.push_str("trader-");
}
if config.use_trader_id {
stream_key.push_str(trader_id.as_str());
stream_key.push(REDIS_DELIMITER);
}
if config.use_instance_id {
stream_key.push_str(&format!("{instance_id}"));
stream_key.push(REDIS_DELIMITER);
}
stream_key.push_str(&config.streams_prefix);
stream_key
}
async fn get_redis_version(conn: &mut redis::aio::ConnectionManager) -> anyhow::Result<SemVer> {
let info: String = redis::cmd("INFO").query_async(conn).await?;
let version_str = match info.lines().find_map(|line| {
if line.starts_with("redis_version:") {
line.split(':').nth(1).map(|s| s.trim().to_string())
} else {
None
}
}) {
Some(info) => info,
None => {
anyhow::bail!("Redis version not available");
}
};
SemVer::parse(&version_str)
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use serde_json::json;
use super::*;
#[rstest]
fn test_get_redis_url_default_values() {
let config: DatabaseConfig = serde_json::from_value(json!({})).unwrap();
let (url, redacted_url) = get_redis_url(config);
assert_eq!(url, "redis://127.0.0.1:6379");
assert_eq!(redacted_url, "redis://127.0.0.1:6379");
}
#[rstest]
fn test_get_redis_url_password_only() {
let config_json = json!({
"host": "example.com",
"port": 6380,
"password": "secretpw", });
let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
let (url, redacted_url) = get_redis_url(config);
assert_eq!(url, "redis://:secretpw@example.com:6380");
assert_eq!(redacted_url, "redis://:se...pw@example.com:6380");
}
#[rstest]
fn test_get_redis_url_full_config_with_ssl() {
let config_json = json!({
"host": "example.com",
"port": 6380,
"username": "user",
"password": "pass",
"ssl": true,
});
let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
let (url, redacted_url) = get_redis_url(config);
assert_eq!(url, "rediss://user:pass@example.com:6380");
assert_eq!(redacted_url, "rediss://user:pass@example.com:6380");
}
#[rstest]
fn test_get_redis_url_full_config_without_ssl() {
let config_json = json!({
"host": "example.com",
"port": 6380,
"username": "username",
"password": "password",
"ssl": false,
});
let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
let (url, redacted_url) = get_redis_url(config);
assert_eq!(url, "redis://username:password@example.com:6380");
assert_eq!(redacted_url, "redis://username:pa...rd@example.com:6380");
}
#[rstest]
fn test_get_redis_url_missing_username_and_password() {
let config_json = json!({
"host": "example.com",
"port": 6380,
"ssl": false,
});
let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
let (url, redacted_url) = get_redis_url(config);
assert_eq!(url, "redis://example.com:6380");
assert_eq!(redacted_url, "redis://example.com:6380");
}
#[rstest]
fn test_get_redis_url_ssl_default_false() {
let config_json = json!({
"host": "example.com",
"port": 6380,
"username": "username",
"password": "password",
});
let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
let (url, redacted_url) = get_redis_url(config);
assert_eq!(url, "redis://username:password@example.com:6380");
assert_eq!(redacted_url, "redis://username:pa...rd@example.com:6380");
}
#[rstest]
fn test_get_stream_key_with_trader_prefix_and_instance_id() {
let trader_id = TraderId::from("tester-123");
let instance_id = UUID4::new();
let config = MessageBusConfig {
use_instance_id: true,
..Default::default()
};
let key = get_stream_key(trader_id, instance_id, &config);
assert_eq!(key, format!("trader-tester-123:{instance_id}:stream"));
}
#[rstest]
fn test_get_stream_key_without_trader_prefix_or_instance_id() {
let trader_id = TraderId::from("tester-123");
let instance_id = UUID4::new();
let config = MessageBusConfig {
use_trader_prefix: false,
use_trader_id: false,
..Default::default()
};
let key = get_stream_key(trader_id, instance_id, &config);
assert_eq!(key, format!("stream"));
}
#[rstest]
fn test_get_index_key_without_prefix() {
let key = "index:order_position";
assert_eq!(get_index_key(key).unwrap(), "index:order_position");
}
#[rstest]
fn test_get_index_key_with_trader_prefix() {
let key = "trader-tester-123:index:order_position";
assert_eq!(get_index_key(key).unwrap(), "index:order_position");
}
#[rstest]
fn test_get_index_key_with_instance_id() {
let key = "trader-tester-123:abc-uuid-123:index:order_position";
assert_eq!(get_index_key(key).unwrap(), "index:order_position");
}
#[rstest]
fn test_get_index_key_invalid() {
let key = "no_index_pattern";
assert!(get_index_key(key).is_err());
}
}