use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use futures::TryStreamExt;
use k8s_openapi::{
api::core::v1::NodeAddress,
apiextensions_apiserver::pkg::apis::apiextensions::v1::{
CustomResourceDefinition, CustomResourceDefinitionNames, CustomResourceDefinitionSpec,
CustomResourceDefinitionVersion, CustomResourceValidation,
},
apimachinery::pkg::{apis::meta::v1::ObjectMeta, util::intstr::IntOrString},
};
use kube::core::Resource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::net::endpoint::{Endpoint, Locality};
const QUILKIN_TOKEN_LABEL: &str = "quilkin.dev/tokens";
pub async fn watch(
gameservers_namespace: String,
config_namespace: Option<String>,
health_check: Arc<AtomicBool>,
locality: Option<Locality>,
filters: crate::config::filter::FilterChainConfig,
clusters: crate::config::Watch<crate::net::ClusterMap>,
address_selector: Option<crate::config::AddressSelector>,
) -> crate::Result<()> {
let client = tokio::time::timeout(
std::time::Duration::from_secs(5),
kube::Client::try_default(),
)
.await??;
let mut configmap_reflector: std::pin::Pin<Box<dyn futures::Stream<Item = _> + Send>> =
if let Some(cns) = config_namespace {
Box::pin(super::update_filters_from_configmap(
client.clone(),
cns,
filters,
))
} else {
Box::pin(futures::stream::pending())
};
let gameserver_reflector = super::update_endpoints_from_gameservers(
client,
gameservers_namespace,
clusters,
locality,
address_selector,
);
tokio::pin!(gameserver_reflector);
loop {
let result = tokio::select! {
result = configmap_reflector.try_next() => result,
result = gameserver_reflector.try_next() => result,
_ = tokio::time::sleep(crate::providers::NO_UPDATE_INTERVAL) => {
tracing::trace!(duration_secs=crate::providers::NO_UPDATE_INTERVAL.as_secs_f64(), "no updates from gameservers or configmap");
Ok(Some(()))
}
};
match result
.and_then(|opt| opt.ok_or_else(|| eyre::eyre!("kubernetes watch stream terminated")))
{
Ok(_) => {
crate::metrics::k8s::active(true);
health_check.store(true, Ordering::SeqCst);
}
Err(error) => break Err(error),
}
}
}
#[derive(Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GameServer {
#[schemars(skip)]
pub metadata: ObjectMeta,
pub spec: GameServerSpec,
pub status: Option<GameServerStatus>,
}
impl GameServer {
pub fn endpoint(
&self,
address_selector: Option<&crate::config::AddressSelector>,
) -> Option<Endpoint> {
self.status.as_ref().and_then(|status| {
let port = status
.ports
.as_ref()
.and_then(|ports| ports.first().map(|status| status.port))
.unwrap_or_default();
let tokens = self.tokens();
let extra_metadata = {
let mut map = serde_json::Map::default();
map.insert(
"name".into(),
self.metadata.name.clone().unwrap_or_default().into(),
);
map
};
let address = if let Some(ads) = address_selector {
status.addresses.iter().find_map(|adr| {
if adr.type_ != ads.name {
return None;
}
use crate::config::AddrKind;
match ads.kind {
AddrKind::Any => Some(adr.address.clone()),
AddrKind::Ipv4 => (!adr.address.contains(':')).then(|| adr.address.clone()),
AddrKind::Ipv6 => adr.address.contains(':').then(|| adr.address.clone()),
}
})?
} else {
status.address.clone()
};
let ep = Endpoint::with_metadata(
(address, port).into(),
crate::net::endpoint::metadata::MetadataView::with_unknown(
crate::net::endpoint::Metadata { tokens },
extra_metadata,
),
);
Some(ep)
})
}
#[inline]
fn tokens(&self) -> quilkin_types::TokenSet {
self.metadata
.annotations
.as_ref()
.and_then(|anno| {
anno.get(QUILKIN_TOKEN_LABEL).map(|value| {
value
.split(',')
.map(crate::codec::base64::decode)
.filter_map(Result::ok)
.collect()
})
})
.unwrap_or_default()
}
}
#[derive(Clone, Debug, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Inner {
#[schemars(skip)]
metadata: ObjectMeta,
spec: GameServerSpec,
status: Option<GameServerStatus>,
}
impl<'de> serde::Deserialize<'de> for GameServer {
fn deserialize<D: serde::Deserializer<'de>>(de: D) -> Result<Self, D::Error> {
use serde::de::Error;
let value = serde_json::Value::deserialize(de).unwrap();
serde_json::from_value::<Inner>(value.clone())
.map_err(|error| {
tracing::trace!(%error, %value, "gameserver failed");
Error::custom(error)
})
.map(
|Inner {
metadata,
spec,
status,
}| Self {
metadata,
spec,
status,
},
)
}
}
impl GameServer {
pub fn new(name: &str, spec: GameServerSpec) -> Self {
Self {
metadata: ObjectMeta {
name: Some(name.to_string()),
..Default::default()
},
spec,
status: None,
}
}
pub fn is_allocated(&self) -> bool {
self.status.as_ref().is_some_and(|status| {
tracing::trace!(?status.addresses, ?status.state, "checking gameserver");
matches!(status.state, GameServerState::Allocated)
})
}
}
impl serde::Serialize for GameServer {
fn serialize<S: serde::Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
use serde::ser::SerializeStruct;
let mut obj = ser.serialize_struct("GameServer", 5)?;
obj.serialize_field("apiVersion", &GameServer::api_version(&()))?;
obj.serialize_field("kind", &GameServer::kind(&()))?;
obj.serialize_field("metadata", &self.metadata)?;
obj.serialize_field("spec", &self.spec)?;
obj.serialize_field("status", &self.status)?;
obj.end()
}
}
impl Resource for GameServer {
type DynamicType = ();
type Scope = kube::core::NamespaceResourceScope;
fn group(_: &()) -> std::borrow::Cow<'_, str> {
"agones.dev".into()
}
fn kind(_: &()) -> std::borrow::Cow<'_, str> {
"GameServer".into()
}
fn version(_: &()) -> std::borrow::Cow<'_, str> {
"v1".into()
}
fn api_version(_: &()) -> std::borrow::Cow<'_, str> {
"agones.dev/v1".into()
}
fn plural(_: &()) -> std::borrow::Cow<'_, str> {
"gameservers".into()
}
fn meta(&self) -> &ObjectMeta {
&self.metadata
}
fn meta_mut(&mut self) -> &mut ObjectMeta {
&mut self.metadata
}
}
impl kube::core::crd::v1::CustomResourceExt for GameServer {
fn crd() -> CustomResourceDefinition {
let open_api_v3_schema = Some(
schemars::generate::SchemaSettings::openapi3()
.with(|s| {
s.inline_subschemas = true;
s.meta_schema = None;
})
.with_transform(kube_core::schema::StructuralSchemaRewriter)
.into_generator()
.into_root_schema_for::<Self>(),
);
CustomResourceDefinition {
metadata: ObjectMeta {
name: Some("gameservers.agones.dev".into()),
..<_>::default()
},
spec: CustomResourceDefinitionSpec {
group: "agones.dev".into(),
scope: "Namespaced".into(),
names: CustomResourceDefinitionNames {
plural: "gameservers".into(),
singular: Some("gameserver".into()),
kind: "GameServer".into(),
..<_>::default()
},
versions: vec![CustomResourceDefinitionVersion {
name: "v1".into(),
served: true,
storage: true,
schema: Some(CustomResourceValidation {
open_api_v3_schema: serde_json::from_value(
serde_json::to_value(&open_api_v3_schema).unwrap(),
)
.unwrap(),
}),
..<_>::default()
}],
..<_>::default()
},
status: None,
}
}
fn crd_name() -> &'static str {
"gameservers.agones.dev"
}
fn api_resource() -> kube::core::dynamic::ApiResource {
kube::core::dynamic::ApiResource::erase::<Self>(&())
}
fn shortnames() -> &'static [&'static str] {
&[]
}
}
impl kube::core::object::HasSpec for GameServer {
type Spec = GameServerSpec;
fn spec(&self) -> &GameServerSpec {
&self.spec
}
fn spec_mut(&mut self) -> &mut GameServerSpec {
&mut self.spec
}
}
fn deserialize_null_default<'de, D, T>(deserializer: D) -> Result<T, D::Error>
where
T: Default + Deserialize<'de>,
D: serde::de::Deserializer<'de>,
{
let opt = Option::deserialize(deserializer)?;
Ok(opt.unwrap_or_default())
}
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GameServerSpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub container: Option<String>,
#[serde(deserialize_with = "deserialize_null_default")]
pub ports: Vec<GameServerPort>,
pub health: Health,
pub scheduling: SchedulingStrategy,
pub sdk_server: SdkServer,
pub template: k8s_openapi::api::core::v1::PodTemplateSpec,
}
impl Default for GameServerSpec {
fn default() -> Self {
Self {
container: None,
ports: vec![],
health: Default::default(),
scheduling: SchedulingStrategy::Packed,
sdk_server: Default::default(),
template: Default::default(),
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
pub struct Health {
#[serde(default)]
disabled: bool,
#[serde(rename = "periodSeconds", default = "default_period_seconds")]
period_seconds: i32,
#[serde(rename = "failureThreshold", default = "default_failure_threshold")]
failure_threshold: i32,
#[serde(
rename = "initialDelaySeconds",
default = "default_initial_delay_seconds"
)]
initial_delay_seconds: i32,
}
fn default_period_seconds() -> i32 {
5
}
fn default_initial_delay_seconds() -> i32 {
5
}
fn default_failure_threshold() -> i32 {
5
}
impl Default for Health {
fn default() -> Self {
Self {
disabled: false,
period_seconds: default_period_seconds(),
failure_threshold: default_failure_threshold(),
initial_delay_seconds: default_failure_threshold(),
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GameServerPort {
pub name: String,
#[serde(default)]
pub port_policy: PortPolicy,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub container: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub container_port: Option<u16>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub host_port: Option<u16>,
#[serde(default)]
pub protocol: Protocol,
}
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GameServerStatus {
pub state: GameServerState,
pub ports: Option<Vec<GameServerStatusPort>>,
pub address: String,
#[serde(deserialize_with = "deserialize_null_default")]
pub addresses: Vec<NodeAddress>,
pub node_name: String,
pub reserved_until: Option<k8s_openapi::apimachinery::pkg::apis::meta::v1::Time>,
}
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
pub enum GameServerState {
PortAllocation,
Creating,
Starting,
Scheduled,
RequestReady,
Ready,
Shutdown,
Error,
Unhealthy,
Reserved,
Allocated,
}
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
pub struct GameServerStatusPort {
pub name: String,
pub port: u16,
}
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct SdkServer {
#[serde(default)]
pub log_level: SdkServerLogLevel,
#[serde(default = "default_sdk_grpc_port")]
pub grpc_port: u16,
#[serde(default = "default_sdk_http_port")]
pub http_port: u16,
}
fn default_sdk_grpc_port() -> u16 {
9357
}
fn default_sdk_http_port() -> u16 {
9358
}
impl Default for SdkServer {
fn default() -> Self {
Self {
log_level: Default::default(),
grpc_port: default_sdk_grpc_port(),
http_port: default_sdk_http_port(),
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema, Default)]
pub enum SdkServerLogLevel {
#[default]
Info,
Debug,
Error,
}
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema, Default)]
pub enum PortPolicy {
Static,
#[default]
Dynamic,
Passthrough,
None,
}
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
pub enum SchedulingStrategy {
Packed,
Distributed,
}
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema, Default)]
pub enum Protocol {
#[serde(rename = "UDP")]
#[default]
Udp,
#[serde(rename = "TCP")]
Tcp,
#[serde(rename = "TCPUDP")]
UdpTcp,
}
#[derive(Clone, Debug, JsonSchema)]
pub struct Fleet {
#[schemars(skip)]
pub metadata: ::k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta,
pub spec: FleetSpec,
#[serde(skip_serializing_if = "Option::is_none")]
pub status: Option<FleetStatus>,
}
impl Fleet {
pub fn new(name: &str, spec: FleetSpec) -> Self {
Self {
metadata: ::k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
name: Some(name.to_string()),
..Default::default()
},
spec,
status: None,
}
}
}
#[derive(Clone, Debug, Deserialize, JsonSchema)]
pub struct FleetInner {
#[schemars(skip)]
metadata: ObjectMeta,
spec: FleetSpec,
status: Option<FleetStatus>,
}
impl<'de> serde::Deserialize<'de> for Fleet {
fn deserialize<D: serde::Deserializer<'de>>(de: D) -> Result<Self, D::Error> {
use serde::de::Error;
let value = serde_json::Value::deserialize(de).unwrap();
serde_json::from_value::<FleetInner>(value.clone())
.map_err(|error| {
tracing::trace!(%error, %value, "fleet failed");
Error::custom(error)
})
.map(
|FleetInner {
metadata,
spec,
status,
}| Self {
metadata,
spec,
status,
},
)
}
}
impl serde::Serialize for Fleet {
fn serialize<S: serde::Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
use serde::ser::SerializeStruct;
let mut obj = ser.serialize_struct("Fleet", 5)?;
obj.serialize_field("apiVersion", &Fleet::api_version(&()))?;
obj.serialize_field("kind", &Fleet::kind(&()))?;
obj.serialize_field("metadata", &self.metadata)?;
obj.serialize_field("spec", &self.spec)?;
obj.serialize_field("status", &self.status)?;
obj.end()
}
}
impl ::kube::core::Resource for Fleet {
type DynamicType = ();
type Scope = ::kube::core::NamespaceResourceScope;
fn group(_: &()) -> std::borrow::Cow<'_, str> {
"agones.dev".into()
}
fn kind(_: &()) -> std::borrow::Cow<'_, str> {
"Fleet".into()
}
fn version(_: &()) -> std::borrow::Cow<'_, str> {
"v1".into()
}
fn api_version(_: &()) -> std::borrow::Cow<'_, str> {
"agones.dev/v1".into()
}
fn plural(_: &()) -> std::borrow::Cow<'_, str> {
"fleets".into()
}
fn meta(&self) -> &::k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
&self.metadata
}
fn meta_mut(&mut self) -> &mut ::k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
&mut self.metadata
}
}
impl ::kube::core::crd::v1::CustomResourceExt for Fleet {
fn crd() -> ::k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition{
let columns: Vec<
::k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition,
> = ::serde_json::from_str("[ ]").expect("valid printer column json");
let scale: Option<
::k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceSubresourceScale,
> = None;
let categories: Vec<String> = ::serde_json::from_str("[]").expect("valid categories");
let shorts: Vec<String> = ::serde_json::from_str("[]").expect("valid shortnames");
let subres = if true {
if scale.is_some() {
::serde_json::Value::Object({
let mut object = ::serde_json::Map::new();
object.insert(
("status").into(),
::serde_json::Value::Object(::serde_json::Map::new()),
);
object.insert(("scale").into(), ::serde_json::to_value(&scale).unwrap());
object
})
} else {
::serde_json::Value::Object({
let mut object = ::serde_json::Map::new();
object.insert(
("status").into(),
::serde_json::Value::Object(::serde_json::Map::new()),
);
object
})
}
} else {
::serde_json::Value::Object(::serde_json::Map::new())
};
let r#gen = schemars::generate::SchemaSettings::openapi3()
.with(|s| {
s.inline_subschemas = true;
s.meta_schema = None;
})
.with_transform(kube_core::schema::StructuralSchemaRewriter)
.into_generator();
let schema = r#gen.into_root_schema_for::<Self>();
let jsondata = ::serde_json::Value::Object({
let mut object = ::serde_json::Map::new();
object.insert(
("metadata").into(),
::serde_json::Value::Object({
let mut object = ::serde_json::Map::new();
object.insert(
("name").into(),
::serde_json::to_value("fleets.agones.dev").unwrap(),
);
object
}),
);
object.insert(
("spec").into(),
::serde_json::Value::Object({
let mut object = ::serde_json::Map::new();
object.insert(
("group").into(),
::serde_json::to_value("agones.dev").unwrap(),
);
object.insert(
("scope").into(),
::serde_json::to_value("Namespaced").unwrap(),
);
object.insert(
("names").into(),
::serde_json::Value::Object({
let mut object = ::serde_json::Map::new();
object.insert(
("categories").into(),
::serde_json::to_value(categories).unwrap(),
);
object.insert(
("plural").into(),
::serde_json::to_value("fleets").unwrap(),
);
object.insert(
("singular").into(),
::serde_json::to_value("fleet").unwrap(),
);
object
.insert(("kind").into(), ::serde_json::to_value("Fleet").unwrap());
object.insert(
("shortNames").into(),
::serde_json::to_value(shorts).unwrap(),
);
object
}),
);
object.insert(
("versions").into(),
::serde_json::Value::Array(<[_]>::into_vec(Box::new([
::serde_json::Value::Object({
let mut object = ::serde_json::Map::new();
object
.insert(("name").into(), ::serde_json::to_value("v1").unwrap());
object.insert(("served").into(), ::serde_json::Value::Bool(true));
object.insert(("storage").into(), ::serde_json::Value::Bool(true));
object.insert(
("schema").into(),
::serde_json::Value::Object({
let mut object = ::serde_json::Map::new();
object.insert(
("openAPIV3Schema").into(),
::serde_json::to_value(&schema).unwrap(),
);
object
}),
);
object.insert(
("additionalPrinterColumns").into(),
::serde_json::to_value(columns).unwrap(),
);
object.insert(
("subresources").into(),
::serde_json::to_value(subres).unwrap(),
);
object
}),
]))),
);
object
}),
);
object
});
::serde_json::from_value(jsondata).expect("valid custom resource from #[kube(attrs..)]")
}
fn crd_name() -> &'static str {
"fleets.agones.dev"
}
fn api_resource() -> ::kube::core::dynamic::ApiResource {
::kube::core::dynamic::ApiResource::erase::<Self>(&())
}
fn shortnames() -> &'static [&'static str] {
&[]
}
}
impl ::kube::core::object::HasSpec for Fleet {
type Spec = FleetSpec;
fn spec(&self) -> &FleetSpec {
&self.spec
}
fn spec_mut(&mut self) -> &mut FleetSpec {
&mut self.spec
}
}
impl ::kube::core::object::HasStatus for Fleet {
type Status = FleetStatus;
fn status(&self) -> Option<&FleetStatus> {
self.status.as_ref()
}
fn status_mut(&mut self) -> &mut Option<FleetStatus> {
&mut self.status
}
}
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Default)]
pub struct FleetSpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub replicas: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub scheduling: Option<FleetScheduling>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub strategy: Option<FleetStrategy>,
pub template: GameServerTemplateSpec,
}
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
pub enum FleetScheduling {
Packed,
Distributed,
}
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
pub struct FleetStrategy {
#[serde(
default,
skip_serializing_if = "Option::is_none",
rename = "rollingUpdate"
)]
pub rolling_update: Option<FleetStrategyRollingUpdate>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub r#type: Option<FleetStrategyType>,
}
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
pub struct FleetStrategyRollingUpdate {
#[serde(default, skip_serializing_if = "Option::is_none", rename = "maxSurge")]
pub max_surge: Option<IntOrString>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
rename = "maxUnavailable"
)]
pub max_unavailable: Option<IntOrString>,
}
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
pub enum FleetStrategyType {
Recreate,
RollingUpdate,
}
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Default)]
pub struct GameServerTemplateSpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata: Option<ObjectMeta>,
pub spec: GameServerSpec,
}
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
pub struct FleetStatus {
#[serde(
default,
skip_serializing_if = "Option::is_none",
rename = "allocatedReplicas"
)]
pub allocated_replicas: Option<i64>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
rename = "readyReplicas"
)]
pub ready_replicas: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub replicas: Option<i64>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
rename = "reservedReplicas"
)]
pub reserved_replicas: Option<i64>,
}