geph5-client 0.2.99

Geph5 client
Documentation
use anyctx::AnyCtx;
use geph5_broker_protocol::{ExitConstraint, ExitRouteDescriptor};
use sqlx::Row;

use crate::{client::Config, database::DATABASE};

async fn ensure_exit_route_cache_table(ctx: &AnyCtx<Config>) -> Result<(), sqlx::Error> {
    sqlx::query(
        "CREATE TABLE IF NOT EXISTS exit_route_cache (
            exit_constraint TEXT PRIMARY KEY,
            route TEXT NOT NULL
        );",
    )
    .execute(ctx.get(DATABASE))
    .await?;
    Ok(())
}

fn serialize_exit_constraint(exit_constraint: &ExitConstraint) -> anyhow::Result<String> {
    Ok(serde_json::to_string(exit_constraint)?)
}

pub async fn write_cached_exit_route(
    ctx: &AnyCtx<Config>,
    exit_constraint: &ExitConstraint,
    route: &ExitRouteDescriptor,
) -> anyhow::Result<()> {
    ensure_exit_route_cache_table(ctx).await?;
    sqlx::query(
        "INSERT INTO exit_route_cache (exit_constraint, route) VALUES (?, ?)
         ON CONFLICT(exit_constraint) DO UPDATE SET route = excluded.route",
    )
    .bind(serialize_exit_constraint(exit_constraint)?)
    .bind(serde_json::to_string(route)?)
    .execute(ctx.get(DATABASE))
    .await?;
    Ok(())
}

pub async fn read_cached_exit_route(
    ctx: &AnyCtx<Config>,
    exit_constraint: &ExitConstraint,
) -> anyhow::Result<Option<ExitRouteDescriptor>> {
    ensure_exit_route_cache_table(ctx).await?;
    let cache_key = serialize_exit_constraint(exit_constraint)?;
    let cached = sqlx::query("SELECT route FROM exit_route_cache WHERE exit_constraint = ?")
        .bind(&cache_key)
        .fetch_optional(ctx.get(DATABASE))
        .await?;
    let Some(row) = cached else {
        return Ok(None);
    };

    let raw_route: String = row.get("route");
    match serde_json::from_str(&raw_route) {
        Ok(route) => Ok(Some(route)),
        Err(err) => {
            tracing::warn!(
                err = debug(&err),
                exit_constraint = %cache_key,
                "cached exit route was corrupt, deleting row"
            );
            sqlx::query("DELETE FROM exit_route_cache WHERE exit_constraint = ?")
                .bind(cache_key)
                .execute(ctx.get(DATABASE))
                .await?;
            Ok(None)
        }
    }
}

#[cfg(test)]
mod tests {
    use std::path::PathBuf;

    use super::{read_cached_exit_route, write_cached_exit_route};
    use crate::client::Config;
    use anyctx::AnyCtx;
    use ed25519_dalek::SigningKey;
    use geph5_broker_protocol::{
        ExitConstraint, ExitDescriptor, ExitRouteDescriptor, RouteDescriptor,
    };
    use isocountry::CountryCode;

    fn test_config() -> Config {
        Config {
            socks5_listen: None,
            http_proxy_listen: None,
            pac_listen: None,
            control_listen: None,
            exit_constraint: ExitConstraint::Auto,
            allow_direct: false,
            cache: Some(temp_db_path()),
            broker: None,
            tunneled_broker: None,
            broker_keys: None,
            port_forward: vec![],
            vpn: false,
            vpn_fd: None,
            spoof_dns: false,
            passthrough_china: false,
            dry_run: true,
            credentials: Default::default(),
            sess_metadata: serde_json::Value::Null,
            task_limit: None,
        }
    }

    fn temp_db_path() -> PathBuf {
        std::env::temp_dir().join(format!(
            "geph5-route-cache-test-{}.db",
            rand::random::<u64>()
        ))
    }

    fn sample_route(port: u16) -> ExitRouteDescriptor {
        let signing_key = SigningKey::from_bytes(&[7; 32]);
        ExitRouteDescriptor {
            exit_pubkey: signing_key.verifying_key(),
            exit: ExitDescriptor {
                c2e_listen: format!("127.0.0.1:{port}").parse().unwrap(),
                b2e_listen: format!("127.0.0.1:{}", port + 1).parse().unwrap(),
                country: CountryCode::CAN,
                city: "Toronto".into(),
                load: 0.1,
                expiry: 1,
            },
            route: RouteDescriptor::Tcp(format!("127.0.0.1:{}", port + 2).parse().unwrap()),
        }
    }

    #[test]
    fn cached_exit_route_round_trips_and_overwrites() {
        smolscale::block_on(async {
            let ctx = AnyCtx::new(test_config());
            let constraint = ExitConstraint::Country(CountryCode::CAN);
            let initial = sample_route(9000);
            write_cached_exit_route(&ctx, &constraint, &initial)
                .await
                .unwrap();

            let cached = read_cached_exit_route(&ctx, &constraint).await.unwrap();
            assert_eq!(
                cached.unwrap().exit.c2e_listen,
                "127.0.0.1:9000".parse().unwrap()
            );

            let updated = sample_route(9100);
            write_cached_exit_route(&ctx, &constraint, &updated)
                .await
                .unwrap();

            let cached = read_cached_exit_route(&ctx, &constraint).await.unwrap();
            assert_eq!(
                cached.unwrap().exit.c2e_listen,
                "127.0.0.1:9100".parse().unwrap()
            );
        });
    }

    #[test]
    fn corrupt_cached_exit_route_is_deleted() {
        smolscale::block_on(async {
            let ctx = AnyCtx::new(test_config());
            let constraint = ExitConstraint::Hostname("example.com".into());
            let cache_key = serde_json::to_string(&constraint).unwrap();
            sqlx::query(
                "CREATE TABLE IF NOT EXISTS exit_route_cache (
                    exit_constraint TEXT PRIMARY KEY,
                    route TEXT NOT NULL
                );",
            )
            .execute(ctx.get(crate::database::DATABASE))
            .await
            .unwrap();
            sqlx::query("INSERT INTO exit_route_cache (exit_constraint, route) VALUES (?, ?)")
                .bind(&cache_key)
                .bind("{not-json")
                .execute(ctx.get(crate::database::DATABASE))
                .await
                .unwrap();

            assert!(
                read_cached_exit_route(&ctx, &constraint)
                    .await
                    .unwrap()
                    .is_none()
            );

            let remaining: Option<String> =
                sqlx::query_scalar("SELECT route FROM exit_route_cache WHERE exit_constraint = ?")
                    .bind(cache_key)
                    .fetch_optional(ctx.get(crate::database::DATABASE))
                    .await
                    .unwrap();
            assert!(remaining.is_none());
        });
    }
}